libceph, rbd: ignore addr->type while comparing in some cases
[linux-2.6-block.git] / net / ceph / mon_client.c
CommitLineData
b2441318 1// SPDX-License-Identifier: GPL-2.0
3d14c5d2 2#include <linux/ceph/ceph_debug.h>
ba75bb98 3
3d14c5d2 4#include <linux/module.h>
ba75bb98 5#include <linux/types.h>
5a0e3ad6 6#include <linux/slab.h>
ba75bb98
SW
7#include <linux/random.h>
8#include <linux/sched.h>
9
220abf5a 10#include <linux/ceph/ceph_features.h>
3d14c5d2
YS
11#include <linux/ceph/mon_client.h>
12#include <linux/ceph/libceph.h>
ab434b60 13#include <linux/ceph/debugfs.h>
3d14c5d2 14#include <linux/ceph/decode.h>
3d14c5d2 15#include <linux/ceph/auth.h>
ba75bb98
SW
16
17/*
18 * Interact with Ceph monitor cluster. Handle requests for new map
19 * versions, and periodically resend as needed. Also implement
20 * statfs() and umount().
21 *
22 * A small cluster of Ceph "monitors" are responsible for managing critical
23 * cluster configuration and state information. An odd number (e.g., 3, 5)
24 * of cmon daemons use a modified version of the Paxos part-time parliament
25 * algorithm to manage the MDS map (mds cluster membership), OSD map, and
26 * list of clients who have mounted the file system.
27 *
28 * We maintain an open, active session with a monitor at all times in order to
29 * receive timely MDSMap updates. We periodically send a keepalive byte on the
30 * TCP socket to ensure we detect a failure. If the connection does break, we
31 * randomly hunt for a new monitor. Once the connection is reestablished, we
32 * resend any outstanding requests.
33 */
34
9e32789f 35static const struct ceph_connection_operations mon_con_ops;
ba75bb98 36
9bd2e6f8
SW
37static int __validate_auth(struct ceph_mon_client *monc);
38
a5cbd5fc
ID
39static int decode_mon_info(void **p, void *end, bool msgr2,
40 struct ceph_entity_addr *addr)
41{
42 void *mon_info_end;
43 u32 struct_len;
44 u8 struct_v;
45 int ret;
46
47 ret = ceph_start_decoding(p, end, 1, "mon_info_t", &struct_v,
48 &struct_len);
49 if (ret)
50 return ret;
51
52 mon_info_end = *p + struct_len;
53 ceph_decode_skip_string(p, end, e_inval); /* skip mon name */
54 ret = ceph_decode_entity_addrvec(p, end, msgr2, addr);
55 if (ret)
56 return ret;
57
58 *p = mon_info_end;
59 return 0;
60
61e_inval:
62 return -EINVAL;
63}
64
ba75bb98
SW
65/*
66 * Decode a monmap blob (e.g., during mount).
a5cbd5fc
ID
67 *
68 * Assume MonMap v3 (i.e. encoding with MONNAMES and MONENC).
ba75bb98 69 */
a5cbd5fc 70static struct ceph_monmap *ceph_monmap_decode(void **p, void *end, bool msgr2)
ba75bb98 71{
a5cbd5fc 72 struct ceph_monmap *monmap = NULL;
ba75bb98 73 struct ceph_fsid fsid;
a5cbd5fc
ID
74 u32 struct_len;
75 int blob_len;
76 int num_mon;
77 u8 struct_v;
78 u32 epoch;
79 int ret;
80 int i;
81
82 ceph_decode_32_safe(p, end, blob_len, e_inval);
83 ceph_decode_need(p, end, blob_len, e_inval);
84
85 ret = ceph_start_decoding(p, end, 6, "monmap", &struct_v, &struct_len);
86 if (ret)
87 goto fail;
88
89 dout("%s struct_v %d\n", __func__, struct_v);
90 ceph_decode_copy_safe(p, end, &fsid, sizeof(fsid), e_inval);
91 ceph_decode_32_safe(p, end, epoch, e_inval);
92 if (struct_v >= 6) {
93 u32 feat_struct_len;
94 u8 feat_struct_v;
4e7a5dcd 95
a5cbd5fc
ID
96 *p += sizeof(struct ceph_timespec); /* skip last_changed */
97 *p += sizeof(struct ceph_timespec); /* skip created */
ba75bb98 98
a5cbd5fc
ID
99 ret = ceph_start_decoding(p, end, 1, "mon_feature_t",
100 &feat_struct_v, &feat_struct_len);
101 if (ret)
102 goto fail;
ba75bb98 103
a5cbd5fc 104 *p += feat_struct_len; /* skip persistent_features */
ba75bb98 105
a5cbd5fc
ID
106 ret = ceph_start_decoding(p, end, 1, "mon_feature_t",
107 &feat_struct_v, &feat_struct_len);
108 if (ret)
109 goto fail;
ba75bb98 110
a5cbd5fc
ID
111 *p += feat_struct_len; /* skip optional_features */
112 }
113 ceph_decode_32_safe(p, end, num_mon, e_inval);
114
115 dout("%s fsid %pU epoch %u num_mon %d\n", __func__, &fsid, epoch,
116 num_mon);
7377324e 117 if (num_mon > CEPH_MAX_MON)
a5cbd5fc
ID
118 goto e_inval;
119
120 monmap = kmalloc(struct_size(monmap, mon_inst, num_mon), GFP_NOIO);
121 if (!monmap) {
122 ret = -ENOMEM;
123 goto fail;
0bfb0f28 124 }
a5cbd5fc
ID
125 monmap->fsid = fsid;
126 monmap->epoch = epoch;
127 monmap->num_mon = num_mon;
128
129 /* legacy_mon_addr map or mon_info map */
130 for (i = 0; i < num_mon; i++) {
131 struct ceph_entity_inst *inst = &monmap->mon_inst[i];
132
133 ceph_decode_skip_string(p, end, e_inval); /* skip mon name */
134 inst->name.type = CEPH_ENTITY_TYPE_MON;
135 inst->name.num = cpu_to_le64(i);
136
137 if (struct_v >= 6)
138 ret = decode_mon_info(p, end, msgr2, &inst->addr);
139 else
140 ret = ceph_decode_entity_addr(p, end, &inst->addr);
141 if (ret)
142 goto fail;
143
144 dout("%s mon%d addr %s\n", __func__, i,
145 ceph_pr_addr(&inst->addr));
146 }
147
148 return monmap;
149
150e_inval:
151 ret = -EINVAL;
152fail:
153 kfree(monmap);
154 return ERR_PTR(ret);
ba75bb98
SW
155}
156
157/*
158 * return true if *addr is included in the monmap.
159 */
160int ceph_monmap_contains(struct ceph_monmap *m, struct ceph_entity_addr *addr)
161{
162 int i;
163
313771e8
ID
164 for (i = 0; i < m->num_mon; i++) {
165 if (ceph_addr_equal_no_type(addr, &m->mon_inst[i].addr))
ba75bb98 166 return 1;
313771e8
ID
167 }
168
ba75bb98
SW
169 return 0;
170}
171
5ce6e9db
SW
172/*
173 * Send an auth request.
174 */
175static void __send_prepared_auth_request(struct ceph_mon_client *monc, int len)
176{
177 monc->pending_auth = 1;
178 monc->m_auth->front.iov_len = len;
179 monc->m_auth->hdr.front_len = cpu_to_le32(len);
6740a845 180 ceph_msg_revoke(monc->m_auth);
5ce6e9db 181 ceph_msg_get(monc->m_auth); /* keep our ref */
67130934 182 ceph_con_send(&monc->con, monc->m_auth);
5ce6e9db
SW
183}
184
ba75bb98
SW
185/*
186 * Close monitor session, if any.
187 */
188static void __close_session(struct ceph_mon_client *monc)
189{
f6a2f5be 190 dout("__close_session closing mon%d\n", monc->cur_mon);
6740a845 191 ceph_msg_revoke(monc->m_auth);
4f471e4a
SW
192 ceph_msg_revoke_incoming(monc->m_auth_reply);
193 ceph_msg_revoke(monc->m_subscribe);
194 ceph_msg_revoke_incoming(monc->m_subscribe_ack);
67130934 195 ceph_con_close(&monc->con);
0e04dc26 196
f6a2f5be
SW
197 monc->pending_auth = 0;
198 ceph_auth_reset(monc->auth);
ba75bb98
SW
199}
200
201/*
0e04dc26
ID
202 * Pick a new monitor at random and set cur_mon. If we are repicking
203 * (i.e. cur_mon is already set), be sure to pick a different one.
ba75bb98 204 */
0e04dc26 205static void pick_new_mon(struct ceph_mon_client *monc)
ba75bb98 206{
0e04dc26 207 int old_mon = monc->cur_mon;
ba75bb98 208
0e04dc26 209 BUG_ON(monc->monmap->num_mon < 1);
ba75bb98 210
0e04dc26
ID
211 if (monc->monmap->num_mon == 1) {
212 monc->cur_mon = 0;
213 } else {
214 int max = monc->monmap->num_mon;
215 int o = -1;
216 int n;
217
218 if (monc->cur_mon >= 0) {
219 if (monc->cur_mon < monc->monmap->num_mon)
220 o = monc->cur_mon;
221 if (o >= 0)
222 max--;
223 }
4e7a5dcd 224
0e04dc26
ID
225 n = prandom_u32() % max;
226 if (o >= 0 && n >= o)
227 n++;
8b9558aa 228
0e04dc26 229 monc->cur_mon = n;
ba75bb98 230 }
0e04dc26
ID
231
232 dout("%s mon%d -> mon%d out of %d mons\n", __func__, old_mon,
233 monc->cur_mon, monc->monmap->num_mon);
234}
235
236/*
237 * Open a session with a new monitor.
238 */
239static void __open_session(struct ceph_mon_client *monc)
240{
241 int ret;
242
243 pick_new_mon(monc);
244
1752b50c 245 monc->hunting = true;
168b9090
ID
246 if (monc->had_a_connection) {
247 monc->hunt_mult *= CEPH_MONC_HUNT_BACKOFF;
248 if (monc->hunt_mult > CEPH_MONC_HUNT_MAX_MULT)
249 monc->hunt_mult = CEPH_MONC_HUNT_MAX_MULT;
250 }
251
0e04dc26
ID
252 monc->sub_renew_after = jiffies; /* i.e., expired */
253 monc->sub_renew_sent = 0;
254
255 dout("%s opening mon%d\n", __func__, monc->cur_mon);
256 ceph_con_open(&monc->con, CEPH_ENTITY_TYPE_MON, monc->cur_mon,
257 &monc->monmap->mon_inst[monc->cur_mon].addr);
258
259 /*
260 * send an initial keepalive to ensure our timestamp is valid
261 * by the time we are in an OPENED state
262 */
263 ceph_con_keepalive(&monc->con);
264
265 /* initiate authentication handshake */
266 ret = ceph_auth_build_hello(monc->auth,
267 monc->m_auth->front.iov_base,
268 monc->m_auth->front_alloc_len);
269 BUG_ON(ret <= 0);
270 __send_prepared_auth_request(monc, ret);
ba75bb98
SW
271}
272
1752b50c
ID
273static void reopen_session(struct ceph_mon_client *monc)
274{
275 if (!monc->hunting)
276 pr_info("mon%d %s session lost, hunting for new mon\n",
b726ec97 277 monc->cur_mon, ceph_pr_addr(&monc->con.peer_addr));
1752b50c
ID
278
279 __close_session(monc);
280 __open_session(monc);
281}
282
120a75ea
YZ
283void ceph_monc_reopen_session(struct ceph_mon_client *monc)
284{
285 mutex_lock(&monc->mutex);
286 reopen_session(monc);
287 mutex_unlock(&monc->mutex);
288}
289
facb9f6e
ID
290static void un_backoff(struct ceph_mon_client *monc)
291{
292 monc->hunt_mult /= 2; /* reduce by 50% */
293 if (monc->hunt_mult < 1)
294 monc->hunt_mult = 1;
295 dout("%s hunt_mult now %d\n", __func__, monc->hunt_mult);
296}
297
ba75bb98
SW
298/*
299 * Reschedule delayed work timer.
300 */
301static void __schedule_delayed(struct ceph_mon_client *monc)
302{
8b9558aa 303 unsigned long delay;
ba75bb98 304
168b9090
ID
305 if (monc->hunting)
306 delay = CEPH_MONC_HUNT_INTERVAL * monc->hunt_mult;
307 else
58d81b12 308 delay = CEPH_MONC_PING_INTERVAL;
168b9090 309
8b9558aa 310 dout("__schedule_delayed after %lu\n", delay);
bee3a37c
ID
311 mod_delayed_work(system_wq, &monc->delayed_work,
312 round_jiffies_relative(delay));
ba75bb98
SW
313}
314
82dcabad 315const char *ceph_sub_str[] = {
82dcabad
ID
316 [CEPH_SUB_MONMAP] = "monmap",
317 [CEPH_SUB_OSDMAP] = "osdmap",
0cabbd94
YZ
318 [CEPH_SUB_FSMAP] = "fsmap.user",
319 [CEPH_SUB_MDSMAP] = "mdsmap",
82dcabad
ID
320};
321
ba75bb98 322/*
82dcabad
ID
323 * Send subscribe request for one or more maps, according to
324 * monc->subs.
ba75bb98
SW
325 */
326static void __send_subscribe(struct ceph_mon_client *monc)
327{
82dcabad
ID
328 struct ceph_msg *msg = monc->m_subscribe;
329 void *p = msg->front.iov_base;
330 void *const end = p + msg->front_alloc_len;
331 int num = 0;
332 int i;
333
334 dout("%s sent %lu\n", __func__, monc->sub_renew_sent);
335
336 BUG_ON(monc->cur_mon < 0);
337
338 if (!monc->sub_renew_sent)
339 monc->sub_renew_sent = jiffies | 1; /* never 0 */
340
341 msg->hdr.version = cpu_to_le16(2);
342
343 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
344 if (monc->subs[i].want)
345 num++;
346 }
347 BUG_ON(num < 1); /* monmap sub is always there */
348 ceph_encode_32(&p, num);
349 for (i = 0; i < ARRAY_SIZE(monc->subs); i++) {
737cc81e
ID
350 char buf[32];
351 int len;
82dcabad
ID
352
353 if (!monc->subs[i].want)
354 continue;
355
737cc81e
ID
356 len = sprintf(buf, "%s", ceph_sub_str[i]);
357 if (i == CEPH_SUB_MDSMAP &&
358 monc->fs_cluster_id != CEPH_FS_CLUSTER_ID_NONE)
359 len += sprintf(buf + len, ".%d", monc->fs_cluster_id);
360
361 dout("%s %s start %llu flags 0x%x\n", __func__, buf,
82dcabad
ID
362 le64_to_cpu(monc->subs[i].item.start),
363 monc->subs[i].item.flags);
737cc81e 364 ceph_encode_string(&p, end, buf, len);
82dcabad
ID
365 memcpy(p, &monc->subs[i].item, sizeof(monc->subs[i].item));
366 p += sizeof(monc->subs[i].item);
ba75bb98 367 }
82dcabad 368
737cc81e 369 BUG_ON(p > end);
82dcabad
ID
370 msg->front.iov_len = p - msg->front.iov_base;
371 msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
372 ceph_msg_revoke(msg);
373 ceph_con_send(&monc->con, ceph_msg_get(msg));
ba75bb98
SW
374}
375
376static void handle_subscribe_ack(struct ceph_mon_client *monc,
377 struct ceph_msg *msg)
378{
95c96174 379 unsigned int seconds;
07bd10fb
SW
380 struct ceph_mon_subscribe_ack *h = msg->front.iov_base;
381
382 if (msg->front.iov_len < sizeof(*h))
383 goto bad;
384 seconds = le32_to_cpu(h->duration);
ba75bb98 385
ba75bb98 386 mutex_lock(&monc->mutex);
82dcabad 387 if (monc->sub_renew_sent) {
220abf5a
ID
388 /*
389 * This is only needed for legacy (infernalis or older)
390 * MONs -- see delayed_work().
391 */
82dcabad
ID
392 monc->sub_renew_after = monc->sub_renew_sent +
393 (seconds >> 1) * HZ - 1;
394 dout("%s sent %lu duration %d renew after %lu\n", __func__,
395 monc->sub_renew_sent, seconds, monc->sub_renew_after);
396 monc->sub_renew_sent = 0;
397 } else {
398 dout("%s sent %lu renew after %lu, ignoring\n", __func__,
399 monc->sub_renew_sent, monc->sub_renew_after);
400 }
ba75bb98
SW
401 mutex_unlock(&monc->mutex);
402 return;
403bad:
404 pr_err("got corrupt subscribe-ack msg\n");
9ec7cab1 405 ceph_msg_dump(msg);
ba75bb98
SW
406}
407
408/*
82dcabad
ID
409 * Register interest in a map
410 *
411 * @sub: one of CEPH_SUB_*
412 * @epoch: X for "every map since X", or 0 for "just the latest"
ba75bb98 413 */
82dcabad
ID
414static bool __ceph_monc_want_map(struct ceph_mon_client *monc, int sub,
415 u32 epoch, bool continuous)
ba75bb98 416{
82dcabad
ID
417 __le64 start = cpu_to_le64(epoch);
418 u8 flags = !continuous ? CEPH_SUBSCRIBE_ONETIME : 0;
419
420 dout("%s %s epoch %u continuous %d\n", __func__, ceph_sub_str[sub],
421 epoch, continuous);
422
423 if (monc->subs[sub].want &&
424 monc->subs[sub].item.start == start &&
425 monc->subs[sub].item.flags == flags)
426 return false;
427
428 monc->subs[sub].item.start = start;
429 monc->subs[sub].item.flags = flags;
430 monc->subs[sub].want = true;
431
432 return true;
433}
434
435bool ceph_monc_want_map(struct ceph_mon_client *monc, int sub, u32 epoch,
436 bool continuous)
437{
438 bool need_request;
439
ba75bb98 440 mutex_lock(&monc->mutex);
82dcabad 441 need_request = __ceph_monc_want_map(monc, sub, epoch, continuous);
ba75bb98 442 mutex_unlock(&monc->mutex);
82dcabad
ID
443
444 return need_request;
445}
446EXPORT_SYMBOL(ceph_monc_want_map);
447
448/*
449 * Keep track of which maps we have
450 *
451 * @sub: one of CEPH_SUB_*
452 */
453static void __ceph_monc_got_map(struct ceph_mon_client *monc, int sub,
454 u32 epoch)
455{
456 dout("%s %s epoch %u\n", __func__, ceph_sub_str[sub], epoch);
457
458 if (monc->subs[sub].want) {
459 if (monc->subs[sub].item.flags & CEPH_SUBSCRIBE_ONETIME)
460 monc->subs[sub].want = false;
461 else
462 monc->subs[sub].item.start = cpu_to_le64(epoch + 1);
463 }
464
465 monc->subs[sub].have = epoch;
ba75bb98
SW
466}
467
82dcabad 468void ceph_monc_got_map(struct ceph_mon_client *monc, int sub, u32 epoch)
ba75bb98
SW
469{
470 mutex_lock(&monc->mutex);
82dcabad 471 __ceph_monc_got_map(monc, sub, epoch);
ba75bb98 472 mutex_unlock(&monc->mutex);
ba75bb98 473}
82dcabad 474EXPORT_SYMBOL(ceph_monc_got_map);
ba75bb98 475
42c1b124
ID
476void ceph_monc_renew_subs(struct ceph_mon_client *monc)
477{
478 mutex_lock(&monc->mutex);
479 __send_subscribe(monc);
480 mutex_unlock(&monc->mutex);
481}
482EXPORT_SYMBOL(ceph_monc_renew_subs);
483
a319bf56
ID
484/*
485 * Wait for an osdmap with a given epoch.
486 *
487 * @epoch: epoch to wait for
488 * @timeout: in jiffies, 0 means "wait forever"
489 */
6044cde6
ID
490int ceph_monc_wait_osdmap(struct ceph_mon_client *monc, u32 epoch,
491 unsigned long timeout)
492{
493 unsigned long started = jiffies;
216639dd 494 long ret;
6044cde6
ID
495
496 mutex_lock(&monc->mutex);
82dcabad 497 while (monc->subs[CEPH_SUB_OSDMAP].have < epoch) {
6044cde6
ID
498 mutex_unlock(&monc->mutex);
499
a319bf56 500 if (timeout && time_after_eq(jiffies, started + timeout))
6044cde6
ID
501 return -ETIMEDOUT;
502
503 ret = wait_event_interruptible_timeout(monc->client->auth_wq,
82dcabad
ID
504 monc->subs[CEPH_SUB_OSDMAP].have >= epoch,
505 ceph_timeout_jiffies(timeout));
6044cde6
ID
506 if (ret < 0)
507 return ret;
508
509 mutex_lock(&monc->mutex);
510 }
511
512 mutex_unlock(&monc->mutex);
513 return 0;
514}
515EXPORT_SYMBOL(ceph_monc_wait_osdmap);
ba75bb98 516
4e7a5dcd 517/*
82dcabad
ID
518 * Open a session with a random monitor. Request monmap and osdmap,
519 * which are waited upon in __ceph_open_session().
4e7a5dcd
SW
520 */
521int ceph_monc_open_session(struct ceph_mon_client *monc)
ba75bb98 522{
ba75bb98 523 mutex_lock(&monc->mutex);
82dcabad
ID
524 __ceph_monc_want_map(monc, CEPH_SUB_MONMAP, 0, true);
525 __ceph_monc_want_map(monc, CEPH_SUB_OSDMAP, 0, false);
4e7a5dcd 526 __open_session(monc);
ba75bb98
SW
527 __schedule_delayed(monc);
528 mutex_unlock(&monc->mutex);
529 return 0;
530}
3d14c5d2 531EXPORT_SYMBOL(ceph_monc_open_session);
ba75bb98 532
0743304d
SW
533static void ceph_monc_handle_map(struct ceph_mon_client *monc,
534 struct ceph_msg *msg)
4e7a5dcd
SW
535{
536 struct ceph_client *client = monc->client;
4d8b8fb4 537 struct ceph_monmap *monmap;
4e7a5dcd
SW
538 void *p, *end;
539
540 mutex_lock(&monc->mutex);
541
542 dout("handle_monmap\n");
543 p = msg->front.iov_base;
544 end = p + msg->front.iov_len;
545
a5cbd5fc 546 monmap = ceph_monmap_decode(&p, end, false);
4e7a5dcd
SW
547 if (IS_ERR(monmap)) {
548 pr_err("problem decoding monmap, %d\n",
549 (int)PTR_ERR(monmap));
0bfb0f28 550 ceph_msg_dump(msg);
d4a780ce 551 goto out;
4e7a5dcd 552 }
0743304d 553
4d8b8fb4 554 if (ceph_check_fsid(client, &monmap->fsid) < 0) {
4e7a5dcd 555 kfree(monmap);
d4a780ce 556 goto out;
4e7a5dcd
SW
557 }
558
4d8b8fb4
ID
559 kfree(monc->monmap);
560 monc->monmap = monmap;
4e7a5dcd 561
82dcabad 562 __ceph_monc_got_map(monc, CEPH_SUB_MONMAP, monc->monmap->epoch);
02ac956c 563 client->have_fsid = true;
d1c338a5 564
d4a780ce 565out:
4e7a5dcd 566 mutex_unlock(&monc->mutex);
03066f23 567 wake_up_all(&client->auth_wq);
4e7a5dcd
SW
568}
569
ba75bb98 570/*
7a6fdeb2 571 * generic requests (currently statfs, mon_get_version)
ba75bb98 572 */
fcd00b68 573DEFINE_RB_FUNCS(generic_request, struct ceph_mon_generic_request, tid, node)
85ff03f6 574
f8c76f6f 575static void release_generic_request(struct kref *kref)
3143edd3 576{
f8c76f6f
YS
577 struct ceph_mon_generic_request *req =
578 container_of(kref, struct ceph_mon_generic_request, kref);
3143edd3 579
d0b19705
ID
580 dout("%s greq %p request %p reply %p\n", __func__, req, req->request,
581 req->reply);
582 WARN_ON(!RB_EMPTY_NODE(&req->node));
583
3143edd3
SW
584 if (req->reply)
585 ceph_msg_put(req->reply);
586 if (req->request)
587 ceph_msg_put(req->request);
20547567
YS
588
589 kfree(req);
3143edd3
SW
590}
591
f8c76f6f 592static void put_generic_request(struct ceph_mon_generic_request *req)
3143edd3 593{
d0b19705
ID
594 if (req)
595 kref_put(&req->kref, release_generic_request);
3143edd3
SW
596}
597
f8c76f6f 598static void get_generic_request(struct ceph_mon_generic_request *req)
3143edd3
SW
599{
600 kref_get(&req->kref);
601}
602
d0b19705
ID
603static struct ceph_mon_generic_request *
604alloc_generic_request(struct ceph_mon_client *monc, gfp_t gfp)
605{
606 struct ceph_mon_generic_request *req;
607
608 req = kzalloc(sizeof(*req), gfp);
609 if (!req)
610 return NULL;
611
612 req->monc = monc;
613 kref_init(&req->kref);
614 RB_CLEAR_NODE(&req->node);
615 init_completion(&req->completion);
616
617 dout("%s greq %p\n", __func__, req);
618 return req;
619}
620
621static void register_generic_request(struct ceph_mon_generic_request *req)
622{
623 struct ceph_mon_client *monc = req->monc;
624
625 WARN_ON(req->tid);
626
627 get_generic_request(req);
628 req->tid = ++monc->last_tid;
629 insert_generic_request(&monc->generic_request_tree, req);
630}
631
632static void send_generic_request(struct ceph_mon_client *monc,
633 struct ceph_mon_generic_request *req)
634{
635 WARN_ON(!req->tid);
636
637 dout("%s greq %p tid %llu\n", __func__, req, req->tid);
638 req->request->hdr.tid = cpu_to_le64(req->tid);
639 ceph_con_send(&monc->con, ceph_msg_get(req->request));
640}
641
642static void __finish_generic_request(struct ceph_mon_generic_request *req)
643{
644 struct ceph_mon_client *monc = req->monc;
645
646 dout("%s greq %p tid %llu\n", __func__, req, req->tid);
647 erase_generic_request(&monc->generic_request_tree, req);
648
649 ceph_msg_revoke(req->request);
650 ceph_msg_revoke_incoming(req->reply);
651}
652
653static void finish_generic_request(struct ceph_mon_generic_request *req)
654{
655 __finish_generic_request(req);
656 put_generic_request(req);
657}
658
659static void complete_generic_request(struct ceph_mon_generic_request *req)
660{
661 if (req->complete_cb)
662 req->complete_cb(req);
663 else
664 complete_all(&req->completion);
665 put_generic_request(req);
666}
667
f52ec33c 668static void cancel_generic_request(struct ceph_mon_generic_request *req)
d0b19705
ID
669{
670 struct ceph_mon_client *monc = req->monc;
671 struct ceph_mon_generic_request *lookup_req;
672
673 dout("%s greq %p tid %llu\n", __func__, req, req->tid);
674
675 mutex_lock(&monc->mutex);
676 lookup_req = lookup_generic_request(&monc->generic_request_tree,
677 req->tid);
678 if (lookup_req) {
679 WARN_ON(lookup_req != req);
680 finish_generic_request(req);
681 }
682
683 mutex_unlock(&monc->mutex);
684}
685
686static int wait_generic_request(struct ceph_mon_generic_request *req)
687{
688 int ret;
689
690 dout("%s greq %p tid %llu\n", __func__, req, req->tid);
691 ret = wait_for_completion_interruptible(&req->completion);
692 if (ret)
693 cancel_generic_request(req);
694 else
695 ret = req->result; /* completed */
696
697 return ret;
698}
699
f8c76f6f 700static struct ceph_msg *get_generic_reply(struct ceph_connection *con,
3143edd3
SW
701 struct ceph_msg_header *hdr,
702 int *skip)
703{
704 struct ceph_mon_client *monc = con->private;
f8c76f6f 705 struct ceph_mon_generic_request *req;
3143edd3
SW
706 u64 tid = le64_to_cpu(hdr->tid);
707 struct ceph_msg *m;
708
709 mutex_lock(&monc->mutex);
fcd00b68 710 req = lookup_generic_request(&monc->generic_request_tree, tid);
3143edd3 711 if (!req) {
f8c76f6f 712 dout("get_generic_reply %lld dne\n", tid);
3143edd3
SW
713 *skip = 1;
714 m = NULL;
715 } else {
f8c76f6f 716 dout("get_generic_reply %lld got %p\n", tid, req->reply);
1c20f2d2 717 *skip = 0;
3143edd3
SW
718 m = ceph_msg_get(req->reply);
719 /*
720 * we don't need to track the connection reading into
721 * this reply because we only have one open connection
722 * at a time, ever.
723 */
724 }
725 mutex_unlock(&monc->mutex);
726 return m;
727}
728
e56fa10e
YS
729/*
730 * statfs
731 */
ba75bb98
SW
732static void handle_statfs_reply(struct ceph_mon_client *monc,
733 struct ceph_msg *msg)
734{
f8c76f6f 735 struct ceph_mon_generic_request *req;
ba75bb98 736 struct ceph_mon_statfs_reply *reply = msg->front.iov_base;
3143edd3 737 u64 tid = le64_to_cpu(msg->hdr.tid);
ba75bb98 738
d0b19705
ID
739 dout("%s msg %p tid %llu\n", __func__, msg, tid);
740
ba75bb98
SW
741 if (msg->front.iov_len != sizeof(*reply))
742 goto bad;
ba75bb98
SW
743
744 mutex_lock(&monc->mutex);
fcd00b68 745 req = lookup_generic_request(&monc->generic_request_tree, tid);
d0b19705
ID
746 if (!req) {
747 mutex_unlock(&monc->mutex);
748 return;
ba75bb98 749 }
d0b19705
ID
750
751 req->result = 0;
752 *req->u.st = reply->st; /* struct */
753 __finish_generic_request(req);
ba75bb98 754 mutex_unlock(&monc->mutex);
d0b19705
ID
755
756 complete_generic_request(req);
ba75bb98
SW
757 return;
758
759bad:
7a6fdeb2 760 pr_err("corrupt statfs reply, tid %llu\n", tid);
9ec7cab1 761 ceph_msg_dump(msg);
ba75bb98
SW
762}
763
764/*
3143edd3 765 * Do a synchronous statfs().
ba75bb98 766 */
06d74376
DF
767int ceph_monc_do_statfs(struct ceph_mon_client *monc, u64 data_pool,
768 struct ceph_statfs *buf)
ba75bb98 769{
f8c76f6f 770 struct ceph_mon_generic_request *req;
ba75bb98 771 struct ceph_mon_statfs *h;
d0b19705 772 int ret = -ENOMEM;
3143edd3 773
d0b19705 774 req = alloc_generic_request(monc, GFP_NOFS);
3143edd3 775 if (!req)
d0b19705 776 goto out;
ba75bb98 777
b61c2763
SW
778 req->request = ceph_msg_new(CEPH_MSG_STATFS, sizeof(*h), GFP_NOFS,
779 true);
a79832f2 780 if (!req->request)
3143edd3 781 goto out;
d0b19705
ID
782
783 req->reply = ceph_msg_new(CEPH_MSG_STATFS_REPLY, 64, GFP_NOFS, true);
a79832f2 784 if (!req->reply)
3143edd3 785 goto out;
3143edd3 786
d0b19705 787 req->u.st = buf;
06d74376 788 req->request->hdr.version = cpu_to_le16(2);
d0b19705
ID
789
790 mutex_lock(&monc->mutex);
791 register_generic_request(req);
3143edd3
SW
792 /* fill out request */
793 h = req->request->front.iov_base;
13e38c8a
SW
794 h->monhdr.have_version = 0;
795 h->monhdr.session_mon = cpu_to_le16(-1);
796 h->monhdr.session_mon_tid = 0;
ba75bb98 797 h->fsid = monc->monmap->fsid;
06d74376
DF
798 h->contains_data_pool = (data_pool != CEPH_NOPOOL);
799 h->data_pool = cpu_to_le64(data_pool);
d0b19705
ID
800 send_generic_request(monc, req);
801 mutex_unlock(&monc->mutex);
ba75bb98 802
d0b19705 803 ret = wait_generic_request(req);
e56fa10e 804out:
f646912d 805 put_generic_request(req);
d0b19705 806 return ret;
e56fa10e 807}
3d14c5d2 808EXPORT_SYMBOL(ceph_monc_do_statfs);
e56fa10e 809
513a8243
ID
810static void handle_get_version_reply(struct ceph_mon_client *monc,
811 struct ceph_msg *msg)
812{
813 struct ceph_mon_generic_request *req;
814 u64 tid = le64_to_cpu(msg->hdr.tid);
815 void *p = msg->front.iov_base;
816 void *end = p + msg->front_alloc_len;
817 u64 handle;
818
d0b19705 819 dout("%s msg %p tid %llu\n", __func__, msg, tid);
513a8243
ID
820
821 ceph_decode_need(&p, end, 2*sizeof(u64), bad);
822 handle = ceph_decode_64(&p);
823 if (tid != 0 && tid != handle)
824 goto bad;
825
826 mutex_lock(&monc->mutex);
fcd00b68 827 req = lookup_generic_request(&monc->generic_request_tree, handle);
d0b19705
ID
828 if (!req) {
829 mutex_unlock(&monc->mutex);
830 return;
513a8243 831 }
d0b19705
ID
832
833 req->result = 0;
834 req->u.newest = ceph_decode_64(&p);
835 __finish_generic_request(req);
513a8243 836 mutex_unlock(&monc->mutex);
513a8243 837
d0b19705 838 complete_generic_request(req);
513a8243 839 return;
d0b19705 840
513a8243 841bad:
7a6fdeb2 842 pr_err("corrupt mon_get_version reply, tid %llu\n", tid);
513a8243
ID
843 ceph_msg_dump(msg);
844}
845
d0b19705
ID
846static struct ceph_mon_generic_request *
847__ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
848 ceph_monc_callback_t cb, u64 private_data)
513a8243
ID
849{
850 struct ceph_mon_generic_request *req;
513a8243 851
d0b19705 852 req = alloc_generic_request(monc, GFP_NOIO);
513a8243 853 if (!req)
d0b19705 854 goto err_put_req;
513a8243
ID
855
856 req->request = ceph_msg_new(CEPH_MSG_MON_GET_VERSION,
857 sizeof(u64) + sizeof(u32) + strlen(what),
d0b19705
ID
858 GFP_NOIO, true);
859 if (!req->request)
860 goto err_put_req;
513a8243 861
d0b19705
ID
862 req->reply = ceph_msg_new(CEPH_MSG_MON_GET_VERSION_REPLY, 32, GFP_NOIO,
863 true);
864 if (!req->reply)
865 goto err_put_req;
513a8243 866
d0b19705
ID
867 req->complete_cb = cb;
868 req->private_data = private_data;
513a8243 869
513a8243 870 mutex_lock(&monc->mutex);
d0b19705
ID
871 register_generic_request(req);
872 {
873 void *p = req->request->front.iov_base;
874 void *const end = p + req->request->front_alloc_len;
875
876 ceph_encode_64(&p, req->tid); /* handle */
877 ceph_encode_string(&p, end, what, strlen(what));
878 WARN_ON(p != end);
879 }
880 send_generic_request(monc, req);
881 mutex_unlock(&monc->mutex);
513a8243 882
d0b19705 883 return req;
513a8243 884
d0b19705 885err_put_req:
f646912d 886 put_generic_request(req);
d0b19705 887 return ERR_PTR(-ENOMEM);
513a8243 888}
d0b19705
ID
889
890/*
891 * Send MMonGetVersion and wait for the reply.
892 *
893 * @what: one of "mdsmap", "osdmap" or "monmap"
894 */
895int ceph_monc_get_version(struct ceph_mon_client *monc, const char *what,
896 u64 *newest)
897{
898 struct ceph_mon_generic_request *req;
899 int ret;
900
901 req = __ceph_monc_get_version(monc, what, NULL, 0);
902 if (IS_ERR(req))
903 return PTR_ERR(req);
904
905 ret = wait_generic_request(req);
906 if (!ret)
907 *newest = req->u.newest;
908
909 put_generic_request(req);
910 return ret;
911}
912EXPORT_SYMBOL(ceph_monc_get_version);
913
914/*
915 * Send MMonGetVersion,
916 *
917 * @what: one of "mdsmap", "osdmap" or "monmap"
918 */
919int ceph_monc_get_version_async(struct ceph_mon_client *monc, const char *what,
920 ceph_monc_callback_t cb, u64 private_data)
921{
922 struct ceph_mon_generic_request *req;
923
924 req = __ceph_monc_get_version(monc, what, cb, private_data);
925 if (IS_ERR(req))
926 return PTR_ERR(req);
927
928 put_generic_request(req);
929 return 0;
930}
931EXPORT_SYMBOL(ceph_monc_get_version_async);
513a8243 932
6305a3b4
DF
933static void handle_command_ack(struct ceph_mon_client *monc,
934 struct ceph_msg *msg)
935{
936 struct ceph_mon_generic_request *req;
937 void *p = msg->front.iov_base;
938 void *const end = p + msg->front_alloc_len;
939 u64 tid = le64_to_cpu(msg->hdr.tid);
940
941 dout("%s msg %p tid %llu\n", __func__, msg, tid);
942
943 ceph_decode_need(&p, end, sizeof(struct ceph_mon_request_header) +
944 sizeof(u32), bad);
945 p += sizeof(struct ceph_mon_request_header);
946
947 mutex_lock(&monc->mutex);
948 req = lookup_generic_request(&monc->generic_request_tree, tid);
949 if (!req) {
950 mutex_unlock(&monc->mutex);
951 return;
952 }
953
954 req->result = ceph_decode_32(&p);
955 __finish_generic_request(req);
956 mutex_unlock(&monc->mutex);
957
958 complete_generic_request(req);
959 return;
960
961bad:
962 pr_err("corrupt mon_command ack, tid %llu\n", tid);
963 ceph_msg_dump(msg);
964}
965
1b05fae7
ID
966static __printf(2, 0)
967int do_mon_command_vargs(struct ceph_mon_client *monc, const char *fmt,
968 va_list ap)
6305a3b4
DF
969{
970 struct ceph_mon_generic_request *req;
971 struct ceph_mon_command *h;
972 int ret = -ENOMEM;
973 int len;
974
975 req = alloc_generic_request(monc, GFP_NOIO);
976 if (!req)
977 goto out;
978
979 req->request = ceph_msg_new(CEPH_MSG_MON_COMMAND, 256, GFP_NOIO, true);
980 if (!req->request)
981 goto out;
982
983 req->reply = ceph_msg_new(CEPH_MSG_MON_COMMAND_ACK, 512, GFP_NOIO,
984 true);
985 if (!req->reply)
986 goto out;
987
988 mutex_lock(&monc->mutex);
989 register_generic_request(req);
990 h = req->request->front.iov_base;
991 h->monhdr.have_version = 0;
992 h->monhdr.session_mon = cpu_to_le16(-1);
993 h->monhdr.session_mon_tid = 0;
994 h->fsid = monc->monmap->fsid;
995 h->num_strs = cpu_to_le32(1);
1b05fae7 996 len = vsprintf(h->str, fmt, ap);
6305a3b4
DF
997 h->str_len = cpu_to_le32(len);
998 send_generic_request(monc, req);
999 mutex_unlock(&monc->mutex);
1000
1001 ret = wait_generic_request(req);
1002out:
1003 put_generic_request(req);
1004 return ret;
1005}
1b05fae7
ID
1006
1007static __printf(2, 3)
1008int do_mon_command(struct ceph_mon_client *monc, const char *fmt, ...)
1009{
1010 va_list ap;
1011 int ret;
1012
1013 va_start(ap, fmt);
1014 ret = do_mon_command_vargs(monc, fmt, ap);
1015 va_end(ap);
1016 return ret;
1017}
1018
1019int ceph_monc_blocklist_add(struct ceph_mon_client *monc,
1020 struct ceph_entity_addr *client_addr)
1021{
1022 int ret;
1023
1024 ret = do_mon_command(monc,
1025 "{ \"prefix\": \"osd blocklist\", \
1026 \"blocklistop\": \"add\", \
1027 \"addr\": \"%pISpc/%u\" }",
1028 &client_addr->in_addr,
1029 le32_to_cpu(client_addr->nonce));
1030 if (ret == -EINVAL) {
1031 /*
1032 * The monitor returns EINVAL on an unrecognized command.
1033 * Try the legacy command -- it is exactly the same except
1034 * for the name.
1035 */
1036 ret = do_mon_command(monc,
1037 "{ \"prefix\": \"osd blacklist\", \
1038 \"blacklistop\": \"add\", \
1039 \"addr\": \"%pISpc/%u\" }",
1040 &client_addr->in_addr,
1041 le32_to_cpu(client_addr->nonce));
1042 }
1043 if (ret)
1044 return ret;
1045
1046 /*
1047 * Make sure we have the osdmap that includes the blocklist
1048 * entry. This is needed to ensure that the OSDs pick up the
1049 * new blocklist before processing any future requests from
1050 * this client.
1051 */
1052 return ceph_wait_for_latest_osdmap(monc->client, 0);
1053}
0b98acd6 1054EXPORT_SYMBOL(ceph_monc_blocklist_add);
6305a3b4 1055
ba75bb98 1056/*
e56fa10e 1057 * Resend pending generic requests.
ba75bb98 1058 */
f8c76f6f 1059static void __resend_generic_request(struct ceph_mon_client *monc)
ba75bb98 1060{
f8c76f6f 1061 struct ceph_mon_generic_request *req;
85ff03f6 1062 struct rb_node *p;
ba75bb98 1063
f8c76f6f
YS
1064 for (p = rb_first(&monc->generic_request_tree); p; p = rb_next(p)) {
1065 req = rb_entry(p, struct ceph_mon_generic_request, node);
6740a845 1066 ceph_msg_revoke(req->request);
4f471e4a 1067 ceph_msg_revoke_incoming(req->reply);
67130934 1068 ceph_con_send(&monc->con, ceph_msg_get(req->request));
ba75bb98
SW
1069 }
1070}
1071
1072/*
1073 * Delayed work. If we haven't mounted yet, retry. Otherwise,
1074 * renew/retry subscription as needed (in case it is timing out, or we
1075 * got an ENOMEM). And keep the monitor connection alive.
1076 */
1077static void delayed_work(struct work_struct *work)
1078{
1079 struct ceph_mon_client *monc =
1080 container_of(work, struct ceph_mon_client, delayed_work.work);
1081
1082 dout("monc delayed_work\n");
1083 mutex_lock(&monc->mutex);
4e7a5dcd 1084 if (monc->hunting) {
1752b50c
ID
1085 dout("%s continuing hunt\n", __func__);
1086 reopen_session(monc);
ba75bb98 1087 } else {
8b9558aa
YZ
1088 int is_auth = ceph_auth_is_authenticated(monc->auth);
1089 if (ceph_con_keepalive_expired(&monc->con,
58d81b12 1090 CEPH_MONC_PING_TIMEOUT)) {
8b9558aa
YZ
1091 dout("monc keepalive timeout\n");
1092 is_auth = 0;
1752b50c 1093 reopen_session(monc);
8b9558aa 1094 }
9bd2e6f8 1095
8b9558aa
YZ
1096 if (!monc->hunting) {
1097 ceph_con_keepalive(&monc->con);
1098 __validate_auth(monc);
facb9f6e 1099 un_backoff(monc);
8b9558aa 1100 }
9bd2e6f8 1101
220abf5a
ID
1102 if (is_auth &&
1103 !(monc->con.peer_features & CEPH_FEATURE_MON_STATEFUL_SUB)) {
82dcabad
ID
1104 unsigned long now = jiffies;
1105
1106 dout("%s renew subs? now %lu renew after %lu\n",
1107 __func__, now, monc->sub_renew_after);
1108 if (time_after_eq(now, monc->sub_renew_after))
1109 __send_subscribe(monc);
1110 }
ba75bb98 1111 }
ba75bb98
SW
1112 __schedule_delayed(monc);
1113 mutex_unlock(&monc->mutex);
1114}
1115
6b805185
SW
1116/*
1117 * On startup, we build a temporary monmap populated with the IPs
1118 * provided by mount(2).
1119 */
1120static int build_initial_monmap(struct ceph_mon_client *monc)
1121{
3d14c5d2
YS
1122 struct ceph_options *opt = monc->client->options;
1123 struct ceph_entity_addr *mon_addr = opt->mon_addr;
1124 int num_mon = opt->num_mon;
6b805185
SW
1125 int i;
1126
1127 /* build initial monmap */
acafe7e3 1128 monc->monmap = kzalloc(struct_size(monc->monmap, mon_inst, num_mon),
6b805185
SW
1129 GFP_KERNEL);
1130 if (!monc->monmap)
1131 return -ENOMEM;
1132 for (i = 0; i < num_mon; i++) {
1133 monc->monmap->mon_inst[i].addr = mon_addr[i];
6b805185
SW
1134 monc->monmap->mon_inst[i].addr.nonce = 0;
1135 monc->monmap->mon_inst[i].name.type =
1136 CEPH_ENTITY_TYPE_MON;
1137 monc->monmap->mon_inst[i].name.num = cpu_to_le64(i);
1138 }
1139 monc->monmap->num_mon = num_mon;
6b805185
SW
1140 return 0;
1141}
1142
ba75bb98
SW
1143int ceph_monc_init(struct ceph_mon_client *monc, struct ceph_client *cl)
1144{
1145 int err = 0;
1146
1147 dout("init\n");
1148 memset(monc, 0, sizeof(*monc));
1149 monc->client = cl;
1150 monc->monmap = NULL;
1151 mutex_init(&monc->mutex);
1152
6b805185
SW
1153 err = build_initial_monmap(monc);
1154 if (err)
1155 goto out;
1156
f6a2f5be 1157 /* connection */
4e7a5dcd 1158 /* authentication */
3d14c5d2 1159 monc->auth = ceph_auth_init(cl->options->name,
8323c3aa 1160 cl->options->key);
49d9224c
NW
1161 if (IS_ERR(monc->auth)) {
1162 err = PTR_ERR(monc->auth);
67130934 1163 goto out_monmap;
49d9224c 1164 }
4e7a5dcd
SW
1165 monc->auth->want_keys =
1166 CEPH_ENTITY_TYPE_AUTH | CEPH_ENTITY_TYPE_MON |
1167 CEPH_ENTITY_TYPE_OSD | CEPH_ENTITY_TYPE_MDS;
1168
240ed68e 1169 /* msgs */
a79832f2 1170 err = -ENOMEM;
7c315c55 1171 monc->m_subscribe_ack = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE_ACK,
34d23762 1172 sizeof(struct ceph_mon_subscribe_ack),
5418d0a2 1173 GFP_KERNEL, true);
a79832f2 1174 if (!monc->m_subscribe_ack)
49d9224c 1175 goto out_auth;
6694d6b9 1176
5418d0a2
ID
1177 monc->m_subscribe = ceph_msg_new(CEPH_MSG_MON_SUBSCRIBE, 128,
1178 GFP_KERNEL, true);
240ed68e
SW
1179 if (!monc->m_subscribe)
1180 goto out_subscribe_ack;
1181
5418d0a2
ID
1182 monc->m_auth_reply = ceph_msg_new(CEPH_MSG_AUTH_REPLY, 4096,
1183 GFP_KERNEL, true);
a79832f2 1184 if (!monc->m_auth_reply)
240ed68e 1185 goto out_subscribe;
4e7a5dcd 1186
5418d0a2 1187 monc->m_auth = ceph_msg_new(CEPH_MSG_AUTH, 4096, GFP_KERNEL, true);
9bd2e6f8 1188 monc->pending_auth = 0;
a79832f2 1189 if (!monc->m_auth)
6694d6b9 1190 goto out_auth_reply;
ba75bb98 1191
735a72ef
SW
1192 ceph_con_init(&monc->con, monc, &mon_con_ops,
1193 &monc->client->msgr);
1194
ba75bb98 1195 monc->cur_mon = -1;
168b9090
ID
1196 monc->had_a_connection = false;
1197 monc->hunt_mult = 1;
ba75bb98
SW
1198
1199 INIT_DELAYED_WORK(&monc->delayed_work, delayed_work);
f8c76f6f 1200 monc->generic_request_tree = RB_ROOT;
ba75bb98
SW
1201 monc->last_tid = 0;
1202
737cc81e
ID
1203 monc->fs_cluster_id = CEPH_FS_CLUSTER_ID_NONE;
1204
4e7a5dcd
SW
1205 return 0;
1206
6694d6b9
SW
1207out_auth_reply:
1208 ceph_msg_put(monc->m_auth_reply);
240ed68e
SW
1209out_subscribe:
1210 ceph_msg_put(monc->m_subscribe);
7c315c55
SW
1211out_subscribe_ack:
1212 ceph_msg_put(monc->m_subscribe_ack);
49d9224c
NW
1213out_auth:
1214 ceph_auth_destroy(monc->auth);
4e7a5dcd
SW
1215out_monmap:
1216 kfree(monc->monmap);
ba75bb98
SW
1217out:
1218 return err;
1219}
3d14c5d2 1220EXPORT_SYMBOL(ceph_monc_init);
ba75bb98
SW
1221
1222void ceph_monc_stop(struct ceph_mon_client *monc)
1223{
1224 dout("stop\n");
1225 cancel_delayed_work_sync(&monc->delayed_work);
1226
1227 mutex_lock(&monc->mutex);
1228 __close_session(monc);
0e04dc26 1229 monc->cur_mon = -1;
ba75bb98
SW
1230 mutex_unlock(&monc->mutex);
1231
f3dea7ed
SW
1232 /*
1233 * flush msgr queue before we destroy ourselves to ensure that:
1234 * - any work that references our embedded con is finished.
1235 * - any osd_client or other work that may reference an authorizer
1236 * finishes before we shut down the auth subsystem.
1237 */
1238 ceph_msgr_flush();
1239
4e7a5dcd
SW
1240 ceph_auth_destroy(monc->auth);
1241
d0b19705
ID
1242 WARN_ON(!RB_EMPTY_ROOT(&monc->generic_request_tree));
1243
4e7a5dcd 1244 ceph_msg_put(monc->m_auth);
6694d6b9 1245 ceph_msg_put(monc->m_auth_reply);
240ed68e 1246 ceph_msg_put(monc->m_subscribe);
7c315c55 1247 ceph_msg_put(monc->m_subscribe_ack);
ba75bb98
SW
1248
1249 kfree(monc->monmap);
1250}
3d14c5d2 1251EXPORT_SYMBOL(ceph_monc_stop);
ba75bb98 1252
0f9af169
ID
1253static void finish_hunting(struct ceph_mon_client *monc)
1254{
1255 if (monc->hunting) {
1256 dout("%s found mon%d\n", __func__, monc->cur_mon);
1257 monc->hunting = false;
168b9090 1258 monc->had_a_connection = true;
facb9f6e 1259 un_backoff(monc);
7b4c443d 1260 __schedule_delayed(monc);
0f9af169
ID
1261 }
1262}
1263
8921f251
ID
1264static void finish_auth(struct ceph_mon_client *monc, int auth_err,
1265 bool was_authed)
4e7a5dcd 1266{
8921f251
ID
1267 dout("%s auth_err %d was_authed %d\n", __func__, auth_err, was_authed);
1268 WARN_ON(auth_err > 0);
4e7a5dcd 1269
9bd2e6f8 1270 monc->pending_auth = 0;
8921f251
ID
1271 if (auth_err) {
1272 monc->client->auth_err = auth_err;
1273 wake_up_all(&monc->client->auth_wq);
1274 return;
0f9af169
ID
1275 }
1276
8921f251
ID
1277 if (!was_authed && ceph_auth_is_authenticated(monc->auth)) {
1278 dout("%s authenticated, starting session global_id %llu\n",
1279 __func__, monc->auth->global_id);
0743304d 1280
15d9882c
AE
1281 monc->client->msgr.inst.name.type = CEPH_ENTITY_TYPE_CLIENT;
1282 monc->client->msgr.inst.name.num =
0cf5537b 1283 cpu_to_le64(monc->auth->global_id);
0743304d 1284
4e7a5dcd 1285 __send_subscribe(monc);
f8c76f6f 1286 __resend_generic_request(monc);
0f9af169
ID
1287
1288 pr_info("mon%d %s session established\n", monc->cur_mon,
b726ec97 1289 ceph_pr_addr(&monc->con.peer_addr));
4e7a5dcd 1290 }
8921f251 1291}
0f9af169 1292
8921f251
ID
1293static void handle_auth_reply(struct ceph_mon_client *monc,
1294 struct ceph_msg *msg)
1295{
1296 bool was_authed;
1297 int ret;
1298
1299 mutex_lock(&monc->mutex);
1300 was_authed = ceph_auth_is_authenticated(monc->auth);
1301 ret = ceph_handle_auth_reply(monc->auth, msg->front.iov_base,
1302 msg->front.iov_len,
1303 monc->m_auth->front.iov_base,
1304 monc->m_auth->front_alloc_len);
1305 if (ret > 0) {
1306 __send_prepared_auth_request(monc, ret);
1307 } else {
1308 finish_auth(monc, ret, was_authed);
1309 finish_hunting(monc);
1310 }
4e7a5dcd
SW
1311 mutex_unlock(&monc->mutex);
1312}
1313
9bd2e6f8
SW
1314static int __validate_auth(struct ceph_mon_client *monc)
1315{
1316 int ret;
1317
1318 if (monc->pending_auth)
1319 return 0;
1320
1321 ret = ceph_build_auth(monc->auth, monc->m_auth->front.iov_base,
3cea4c30 1322 monc->m_auth->front_alloc_len);
9bd2e6f8
SW
1323 if (ret <= 0)
1324 return ret; /* either an error, or no need to authenticate */
1325 __send_prepared_auth_request(monc, ret);
1326 return 0;
1327}
1328
1329int ceph_monc_validate_auth(struct ceph_mon_client *monc)
1330{
1331 int ret;
1332
1333 mutex_lock(&monc->mutex);
1334 ret = __validate_auth(monc);
1335 mutex_unlock(&monc->mutex);
1336 return ret;
1337}
3d14c5d2 1338EXPORT_SYMBOL(ceph_monc_validate_auth);
9bd2e6f8 1339
ba75bb98
SW
1340/*
1341 * handle incoming message
1342 */
1343static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1344{
1345 struct ceph_mon_client *monc = con->private;
1346 int type = le16_to_cpu(msg->hdr.type);
1347
ba75bb98 1348 switch (type) {
4e7a5dcd
SW
1349 case CEPH_MSG_AUTH_REPLY:
1350 handle_auth_reply(monc, msg);
ba75bb98
SW
1351 break;
1352
1353 case CEPH_MSG_MON_SUBSCRIBE_ACK:
1354 handle_subscribe_ack(monc, msg);
1355 break;
1356
1357 case CEPH_MSG_STATFS_REPLY:
1358 handle_statfs_reply(monc, msg);
1359 break;
1360
513a8243
ID
1361 case CEPH_MSG_MON_GET_VERSION_REPLY:
1362 handle_get_version_reply(monc, msg);
1363 break;
1364
6305a3b4
DF
1365 case CEPH_MSG_MON_COMMAND_ACK:
1366 handle_command_ack(monc, msg);
1367 break;
1368
4e7a5dcd
SW
1369 case CEPH_MSG_MON_MAP:
1370 ceph_monc_handle_map(monc, msg);
1371 break;
1372
ba75bb98
SW
1373 case CEPH_MSG_OSD_MAP:
1374 ceph_osdc_handle_map(&monc->client->osdc, msg);
1375 break;
1376
1377 default:
3d14c5d2
YS
1378 /* can the chained handler handle it? */
1379 if (monc->client->extra_mon_dispatch &&
1380 monc->client->extra_mon_dispatch(monc->client, msg) == 0)
1381 break;
24e1dd6a 1382
ba75bb98
SW
1383 pr_err("received unknown message type %d %s\n", type,
1384 ceph_msg_type_name(type));
1385 }
1386 ceph_msg_put(msg);
1387}
1388
1389/*
1390 * Allocate memory for incoming message
1391 */
1392static struct ceph_msg *mon_alloc_msg(struct ceph_connection *con,
2450418c
YS
1393 struct ceph_msg_header *hdr,
1394 int *skip)
ba75bb98
SW
1395{
1396 struct ceph_mon_client *monc = con->private;
1397 int type = le16_to_cpu(hdr->type);
2450418c 1398 int front_len = le32_to_cpu(hdr->front_len);
5b3a4db3 1399 struct ceph_msg *m = NULL;
ba75bb98 1400
2450418c 1401 *skip = 0;
0547a9b3 1402
ba75bb98 1403 switch (type) {
ba75bb98 1404 case CEPH_MSG_MON_SUBSCRIBE_ACK:
7c315c55 1405 m = ceph_msg_get(monc->m_subscribe_ack);
2450418c 1406 break;
ba75bb98 1407 case CEPH_MSG_STATFS_REPLY:
6305a3b4 1408 case CEPH_MSG_MON_COMMAND_ACK:
f8c76f6f 1409 return get_generic_reply(con, hdr, skip);
4e7a5dcd 1410 case CEPH_MSG_AUTH_REPLY:
6694d6b9 1411 m = ceph_msg_get(monc->m_auth_reply);
2450418c 1412 break;
513a8243
ID
1413 case CEPH_MSG_MON_GET_VERSION_REPLY:
1414 if (le64_to_cpu(hdr->tid) != 0)
1415 return get_generic_reply(con, hdr, skip);
1416
1417 /*
1418 * Older OSDs don't set reply tid even if the orignal
18370b36
GS
1419 * request had a non-zero tid. Work around this weirdness
1420 * by allocating a new message.
513a8243 1421 */
df561f66 1422 fallthrough;
5b3a4db3
SW
1423 case CEPH_MSG_MON_MAP:
1424 case CEPH_MSG_MDS_MAP:
1425 case CEPH_MSG_OSD_MAP:
0cabbd94 1426 case CEPH_MSG_FS_MAP_USER:
b61c2763 1427 m = ceph_msg_new(type, front_len, GFP_NOFS, false);
1c20f2d2
AE
1428 if (!m)
1429 return NULL; /* ENOMEM--return skip == 0 */
5b3a4db3 1430 break;
ba75bb98 1431 }
2450418c 1432
5b3a4db3
SW
1433 if (!m) {
1434 pr_info("alloc_msg unknown type %d\n", type);
2450418c 1435 *skip = 1;
73c3d481 1436 } else if (front_len > m->front_alloc_len) {
b9a67899
JP
1437 pr_warn("mon_alloc_msg front %d > prealloc %d (%u#%llu)\n",
1438 front_len, m->front_alloc_len,
1439 (unsigned int)con->peer_name.type,
1440 le64_to_cpu(con->peer_name.num));
73c3d481
SW
1441 ceph_msg_put(m);
1442 m = ceph_msg_new(type, front_len, GFP_NOFS, false);
5b3a4db3 1443 }
73c3d481 1444
2450418c 1445 return m;
ba75bb98
SW
1446}
1447
1448/*
1449 * If the monitor connection resets, pick a new monitor and resubmit
1450 * any pending requests.
1451 */
1452static void mon_fault(struct ceph_connection *con)
1453{
1454 struct ceph_mon_client *monc = con->private;
1455
ba75bb98 1456 mutex_lock(&monc->mutex);
b5d91704
ID
1457 dout("%s mon%d\n", __func__, monc->cur_mon);
1458 if (monc->cur_mon >= 0) {
1459 if (!monc->hunting) {
1460 dout("%s hunting for new mon\n", __func__);
1461 reopen_session(monc);
1462 __schedule_delayed(monc);
1463 } else {
1464 dout("%s already hunting\n", __func__);
1465 }
ba75bb98 1466 }
ba75bb98
SW
1467 mutex_unlock(&monc->mutex);
1468}
1469
ec87ef43
SW
1470/*
1471 * We can ignore refcounting on the connection struct, as all references
1472 * will come from the messenger workqueue, which is drained prior to
1473 * mon_client destruction.
1474 */
1475static struct ceph_connection *con_get(struct ceph_connection *con)
1476{
1477 return con;
1478}
1479
1480static void con_put(struct ceph_connection *con)
1481{
1482}
1483
9e32789f 1484static const struct ceph_connection_operations mon_con_ops = {
ec87ef43
SW
1485 .get = con_get,
1486 .put = con_put,
ba75bb98
SW
1487 .dispatch = dispatch,
1488 .fault = mon_fault,
1489 .alloc_msg = mon_alloc_msg,
ba75bb98 1490};