ceph: messenger and osdc changes for rbd
[linux-2.6-block.git] / fs / ceph / osd_client.c
CommitLineData
f24e9980
SW
1#include "ceph_debug.h"
2
3#include <linux/err.h>
4#include <linux/highmem.h>
5#include <linux/mm.h>
6#include <linux/pagemap.h>
7#include <linux/slab.h>
8#include <linux/uaccess.h>
68b4476b
YS
9#ifdef CONFIG_BLOCK
10#include <linux/bio.h>
11#endif
f24e9980
SW
12
13#include "super.h"
14#include "osd_client.h"
15#include "messenger.h"
16#include "decode.h"
4e7a5dcd 17#include "auth.h"
68b4476b 18#include "pagelist.h"
f24e9980 19
c16e7869
SW
20#define OSD_OP_FRONT_LEN 4096
21#define OSD_OPREPLY_FRONT_LEN 512
0d59ab81 22
9e32789f 23static const struct ceph_connection_operations osd_con_ops;
422d2cb8
YS
24static int __kick_requests(struct ceph_osd_client *osdc,
25 struct ceph_osd *kickosd);
f24e9980
SW
26
27static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
28
68b4476b
YS
29static int op_needs_trail(int op)
30{
31 switch (op) {
32 case CEPH_OSD_OP_GETXATTR:
33 case CEPH_OSD_OP_SETXATTR:
34 case CEPH_OSD_OP_CMPXATTR:
35 case CEPH_OSD_OP_CALL:
36 return 1;
37 default:
38 return 0;
39 }
40}
41
42static int op_has_extent(int op)
43{
44 return (op == CEPH_OSD_OP_READ ||
45 op == CEPH_OSD_OP_WRITE);
46}
47
3499e8a5
YS
48void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
49 struct ceph_file_layout *layout,
50 u64 snapid,
68b4476b
YS
51 u64 off, u64 *plen, u64 *bno,
52 struct ceph_osd_request *req,
53 struct ceph_osd_req_op *op)
3499e8a5
YS
54{
55 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
68b4476b 56 u64 orig_len = *plen;
3499e8a5
YS
57 u64 objoff, objlen; /* extent in object */
58
59 reqhead->snapid = cpu_to_le64(snapid);
60
61 /* object extent? */
68b4476b 62 ceph_calc_file_object_mapping(layout, off, plen, bno,
3499e8a5 63 &objoff, &objlen);
68b4476b 64 if (*plen < orig_len)
3499e8a5 65 dout(" skipping last %llu, final file extent %llu~%llu\n",
68b4476b 66 orig_len - *plen, off, *plen);
3499e8a5 67
68b4476b
YS
68 if (op_has_extent(op->op)) {
69 op->extent.offset = objoff;
70 op->extent.length = objlen;
71 }
72 req->r_num_pages = calc_pages_for(off, *plen);
3499e8a5
YS
73
74 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
75 *bno, objoff, objlen, req->r_num_pages);
76
77}
78
f24e9980
SW
79/*
80 * Implement client access to distributed object storage cluster.
81 *
82 * All data objects are stored within a cluster/cloud of OSDs, or
83 * "object storage devices." (Note that Ceph OSDs have _nothing_ to
84 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply
85 * remote daemons serving up and coordinating consistent and safe
86 * access to storage.
87 *
88 * Cluster membership and the mapping of data objects onto storage devices
89 * are described by the osd map.
90 *
91 * We keep track of pending OSD requests (read, write), resubmit
92 * requests to different OSDs when the cluster topology/data layout
93 * change, or retry the affected requests when the communications
94 * channel with an OSD is reset.
95 */
96
97/*
98 * calculate the mapping of a file extent onto an object, and fill out the
99 * request accordingly. shorten extent as necessary if it crosses an
100 * object boundary.
101 *
102 * fill osd op in request message.
103 */
104static void calc_layout(struct ceph_osd_client *osdc,
3499e8a5
YS
105 struct ceph_vino vino,
106 struct ceph_file_layout *layout,
f24e9980 107 u64 off, u64 *plen,
68b4476b
YS
108 struct ceph_osd_request *req,
109 struct ceph_osd_req_op *op)
f24e9980 110{
f24e9980
SW
111 u64 bno;
112
68b4476b
YS
113 ceph_calc_raw_layout(osdc, layout, vino.snap, off,
114 plen, &bno, req, op);
f24e9980
SW
115
116 sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
117 req->r_oid_len = strlen(req->r_oid);
f24e9980
SW
118}
119
f24e9980
SW
120/*
121 * requests
122 */
415e49a9 123void ceph_osdc_release_request(struct kref *kref)
f24e9980 124{
415e49a9
SW
125 struct ceph_osd_request *req = container_of(kref,
126 struct ceph_osd_request,
127 r_kref);
128
129 if (req->r_request)
130 ceph_msg_put(req->r_request);
131 if (req->r_reply)
132 ceph_msg_put(req->r_reply);
0d59ab81 133 if (req->r_con_filling_msg) {
350b1c32 134 dout("release_request revoking pages %p from con %p\n",
0d59ab81
YS
135 req->r_pages, req->r_con_filling_msg);
136 ceph_con_revoke_message(req->r_con_filling_msg,
137 req->r_reply);
138 ceph_con_put(req->r_con_filling_msg);
350b1c32 139 }
415e49a9
SW
140 if (req->r_own_pages)
141 ceph_release_page_vector(req->r_pages,
142 req->r_num_pages);
68b4476b
YS
143#ifdef CONFIG_BLOCK
144 if (req->r_bio)
145 bio_put(req->r_bio);
146#endif
415e49a9 147 ceph_put_snap_context(req->r_snapc);
68b4476b
YS
148 if (req->r_trail) {
149 ceph_pagelist_release(req->r_trail);
150 kfree(req->r_trail);
151 }
415e49a9
SW
152 if (req->r_mempool)
153 mempool_free(req, req->r_osdc->req_mempool);
154 else
155 kfree(req);
f24e9980
SW
156}
157
68b4476b
YS
158static int op_needs_trail(int op)
159{
160 switch (op) {
161 case CEPH_OSD_OP_GETXATTR:
162 case CEPH_OSD_OP_SETXATTR:
163 case CEPH_OSD_OP_CMPXATTR:
164 return 1;
165 default:
166 return 0;
167 }
168}
169
170static int get_num_ops(struct ceph_osd_req_op *ops, int *needs_trail)
171{
172 int i = 0;
173
174 if (needs_trail)
175 *needs_trail = 0;
176 while (ops[i].op) {
177 if (needs_trail && op_needs_trail(ops[i].op))
178 *needs_trail = 1;
179 i++;
180 }
181
182 return i;
183}
184
3499e8a5
YS
185struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
186 int flags,
f24e9980 187 struct ceph_snap_context *snapc,
68b4476b 188 struct ceph_osd_req_op *ops,
3499e8a5
YS
189 bool use_mempool,
190 gfp_t gfp_flags,
68b4476b
YS
191 struct page **pages,
192 struct bio *bio)
f24e9980
SW
193{
194 struct ceph_osd_request *req;
195 struct ceph_msg *msg;
68b4476b
YS
196 int needs_trail;
197 int num_op = get_num_ops(ops, &needs_trail);
198 size_t msg_size = sizeof(struct ceph_osd_request_head);
3499e8a5 199
68b4476b 200 msg_size += num_op*sizeof(struct ceph_osd_op);
f24e9980
SW
201
202 if (use_mempool) {
3499e8a5 203 req = mempool_alloc(osdc->req_mempool, gfp_flags);
f24e9980
SW
204 memset(req, 0, sizeof(*req));
205 } else {
3499e8a5 206 req = kzalloc(sizeof(*req), gfp_flags);
f24e9980
SW
207 }
208 if (req == NULL)
a79832f2 209 return NULL;
f24e9980 210
f24e9980
SW
211 req->r_osdc = osdc;
212 req->r_mempool = use_mempool;
68b4476b 213
415e49a9 214 kref_init(&req->r_kref);
f24e9980
SW
215 init_completion(&req->r_completion);
216 init_completion(&req->r_safe_completion);
217 INIT_LIST_HEAD(&req->r_unsafe_item);
218 req->r_flags = flags;
219
220 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
221
c16e7869
SW
222 /* create reply message */
223 if (use_mempool)
224 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
225 else
226 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
3499e8a5 227 OSD_OPREPLY_FRONT_LEN, gfp_flags);
a79832f2 228 if (!msg) {
c16e7869 229 ceph_osdc_put_request(req);
a79832f2 230 return NULL;
c16e7869
SW
231 }
232 req->r_reply = msg;
233
68b4476b
YS
234 /* allocate space for the trailing data */
235 if (needs_trail) {
236 req->r_trail = kmalloc(sizeof(struct ceph_pagelist), gfp_flags);
237 if (!req->r_trail) {
238 ceph_osdc_put_request(req);
239 return NULL;
240 }
241 ceph_pagelist_init(req->r_trail);
242 }
c16e7869 243 /* create request message; allow space for oid */
f24e9980
SW
244 msg_size += 40;
245 if (snapc)
246 msg_size += sizeof(u64) * snapc->num_snaps;
247 if (use_mempool)
8f3bc053 248 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
f24e9980 249 else
3499e8a5 250 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags);
a79832f2 251 if (!msg) {
f24e9980 252 ceph_osdc_put_request(req);
a79832f2 253 return NULL;
f24e9980 254 }
68b4476b 255
f24e9980
SW
256 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
257 memset(msg->front.iov_base, 0, msg->front.iov_len);
3499e8a5
YS
258
259 req->r_request = msg;
260 req->r_pages = pages;
68b4476b
YS
261#ifdef CONFIG_BLOCK
262 if (bio) {
263 req->r_bio = bio;
264 bio_get(req->r_bio);
265 }
266#endif
3499e8a5
YS
267
268 return req;
269}
270
68b4476b
YS
271static void osd_req_encode_op(struct ceph_osd_request *req,
272 struct ceph_osd_op *dst,
273 struct ceph_osd_req_op *src)
274{
275 dst->op = cpu_to_le16(src->op);
276
277 switch (dst->op) {
278 case CEPH_OSD_OP_READ:
279 case CEPH_OSD_OP_WRITE:
280 dst->extent.offset =
281 cpu_to_le64(src->extent.offset);
282 dst->extent.length =
283 cpu_to_le64(src->extent.length);
284 dst->extent.truncate_size =
285 cpu_to_le64(src->extent.truncate_size);
286 dst->extent.truncate_seq =
287 cpu_to_le32(src->extent.truncate_seq);
288 break;
289
290 case CEPH_OSD_OP_GETXATTR:
291 case CEPH_OSD_OP_SETXATTR:
292 case CEPH_OSD_OP_CMPXATTR:
293 BUG_ON(!req->r_trail);
294
295 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
296 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
297 dst->xattr.cmp_op = src->xattr.cmp_op;
298 dst->xattr.cmp_mode = src->xattr.cmp_mode;
299 ceph_pagelist_append(req->r_trail, src->xattr.name,
300 src->xattr.name_len);
301 ceph_pagelist_append(req->r_trail, src->xattr.val,
302 src->xattr.value_len);
303 break;
304 case CEPH_OSD_OP_STARTSYNC:
305 break;
306 default:
307 pr_err("unrecognized osd opcode %d\n", dst->op);
308 WARN_ON(1);
309 break;
310 }
311 dst->payload_len = cpu_to_le32(src->payload_len);
312}
313
3499e8a5
YS
314/*
315 * build new request AND message
316 *
317 */
318void ceph_osdc_build_request(struct ceph_osd_request *req,
68b4476b
YS
319 u64 off, u64 *plen,
320 struct ceph_osd_req_op *src_ops,
321 struct ceph_snap_context *snapc,
322 struct timespec *mtime,
323 const char *oid,
324 int oid_len)
3499e8a5
YS
325{
326 struct ceph_msg *msg = req->r_request;
327 struct ceph_osd_request_head *head;
68b4476b 328 struct ceph_osd_req_op *src_op;
3499e8a5
YS
329 struct ceph_osd_op *op;
330 void *p;
68b4476b 331 int num_op = get_num_ops(src_ops, NULL);
3499e8a5 332 size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
3499e8a5 333 int flags = req->r_flags;
68b4476b
YS
334 u64 data_len = 0;
335 int i;
3499e8a5 336
f24e9980
SW
337 head = msg->front.iov_base;
338 op = (void *)(head + 1);
339 p = (void *)(op + num_op);
340
f24e9980
SW
341 req->r_snapc = ceph_get_snap_context(snapc);
342
343 head->client_inc = cpu_to_le32(1); /* always, for now. */
344 head->flags = cpu_to_le32(flags);
345 if (flags & CEPH_OSD_FLAG_WRITE)
346 ceph_encode_timespec(&head->mtime, mtime);
347 head->num_ops = cpu_to_le16(num_op);
f24e9980 348
f24e9980
SW
349
350 /* fill in oid */
3499e8a5
YS
351 head->object_len = cpu_to_le32(oid_len);
352 memcpy(p, oid, oid_len);
353 p += oid_len;
f24e9980 354
68b4476b
YS
355 src_op = src_ops;
356 while (src_op->op) {
357 osd_req_encode_op(req, op, src_op);
358 src_op++;
f24e9980 359 op++;
f24e9980 360 }
68b4476b
YS
361
362 if (req->r_trail)
363 data_len += req->r_trail->length;
364
f24e9980
SW
365 if (snapc) {
366 head->snap_seq = cpu_to_le64(snapc->seq);
367 head->num_snaps = cpu_to_le32(snapc->num_snaps);
368 for (i = 0; i < snapc->num_snaps; i++) {
369 put_unaligned_le64(snapc->snaps[i], p);
370 p += sizeof(u64);
371 }
372 }
373
68b4476b
YS
374 if (flags & CEPH_OSD_FLAG_WRITE) {
375 req->r_request->hdr.data_off = cpu_to_le16(off);
376 req->r_request->hdr.data_len = cpu_to_le32(*plen + data_len);
377 } else if (data_len) {
378 req->r_request->hdr.data_off = 0;
379 req->r_request->hdr.data_len = cpu_to_le32(data_len);
380 }
381
f24e9980 382 BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
6f863e71
SW
383 msg_size = p - msg->front.iov_base;
384 msg->front.iov_len = msg_size;
385 msg->hdr.front_len = cpu_to_le32(msg_size);
3499e8a5
YS
386 return;
387}
388
389/*
390 * build new request AND message, calculate layout, and adjust file
391 * extent as needed.
392 *
393 * if the file was recently truncated, we include information about its
394 * old and new size so that the object can be updated appropriately. (we
395 * avoid synchronously deleting truncated objects because it's slow.)
396 *
397 * if @do_sync, include a 'startsync' command so that the osd will flush
398 * data quickly.
399 */
400struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
401 struct ceph_file_layout *layout,
402 struct ceph_vino vino,
403 u64 off, u64 *plen,
404 int opcode, int flags,
405 struct ceph_snap_context *snapc,
406 int do_sync,
407 u32 truncate_seq,
408 u64 truncate_size,
409 struct timespec *mtime,
410 bool use_mempool, int num_reply)
411{
68b4476b
YS
412 struct ceph_osd_req_op ops[3];
413 struct ceph_osd_request *req;
414
415 ops[0].op = opcode;
416 ops[0].extent.truncate_seq = truncate_seq;
417 ops[0].extent.truncate_size = truncate_size;
418 ops[0].payload_len = 0;
419
420 if (do_sync) {
421 ops[1].op = CEPH_OSD_OP_STARTSYNC;
422 ops[1].payload_len = 0;
423 ops[2].op = 0;
424 } else
425 ops[1].op = 0;
426
427 req = ceph_osdc_alloc_request(osdc, flags,
428 snapc, ops,
3499e8a5 429 use_mempool,
68b4476b 430 GFP_NOFS, NULL, NULL);
3499e8a5
YS
431 if (IS_ERR(req))
432 return req;
433
434 /* calculate max write size */
68b4476b 435 calc_layout(osdc, vino, layout, off, plen, req, ops);
3499e8a5
YS
436 req->r_file_layout = *layout; /* keep a copy */
437
68b4476b
YS
438 ceph_osdc_build_request(req, off, plen, ops,
439 snapc,
3499e8a5
YS
440 mtime,
441 req->r_oid, req->r_oid_len);
442
f24e9980
SW
443 return req;
444}
445
446/*
447 * We keep osd requests in an rbtree, sorted by ->r_tid.
448 */
449static void __insert_request(struct ceph_osd_client *osdc,
450 struct ceph_osd_request *new)
451{
452 struct rb_node **p = &osdc->requests.rb_node;
453 struct rb_node *parent = NULL;
454 struct ceph_osd_request *req = NULL;
455
456 while (*p) {
457 parent = *p;
458 req = rb_entry(parent, struct ceph_osd_request, r_node);
459 if (new->r_tid < req->r_tid)
460 p = &(*p)->rb_left;
461 else if (new->r_tid > req->r_tid)
462 p = &(*p)->rb_right;
463 else
464 BUG();
465 }
466
467 rb_link_node(&new->r_node, parent, p);
468 rb_insert_color(&new->r_node, &osdc->requests);
469}
470
471static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
472 u64 tid)
473{
474 struct ceph_osd_request *req;
475 struct rb_node *n = osdc->requests.rb_node;
476
477 while (n) {
478 req = rb_entry(n, struct ceph_osd_request, r_node);
479 if (tid < req->r_tid)
480 n = n->rb_left;
481 else if (tid > req->r_tid)
482 n = n->rb_right;
483 else
484 return req;
485 }
486 return NULL;
487}
488
489static struct ceph_osd_request *
490__lookup_request_ge(struct ceph_osd_client *osdc,
491 u64 tid)
492{
493 struct ceph_osd_request *req;
494 struct rb_node *n = osdc->requests.rb_node;
495
496 while (n) {
497 req = rb_entry(n, struct ceph_osd_request, r_node);
498 if (tid < req->r_tid) {
499 if (!n->rb_left)
500 return req;
501 n = n->rb_left;
502 } else if (tid > req->r_tid) {
503 n = n->rb_right;
504 } else {
505 return req;
506 }
507 }
508 return NULL;
509}
510
511
512/*
81b024e7 513 * If the osd connection drops, we need to resubmit all requests.
f24e9980
SW
514 */
515static void osd_reset(struct ceph_connection *con)
516{
517 struct ceph_osd *osd = con->private;
518 struct ceph_osd_client *osdc;
519
520 if (!osd)
521 return;
522 dout("osd_reset osd%d\n", osd->o_osd);
523 osdc = osd->o_osdc;
f24e9980
SW
524 down_read(&osdc->map_sem);
525 kick_requests(osdc, osd);
526 up_read(&osdc->map_sem);
527}
528
529/*
530 * Track open sessions with osds.
531 */
532static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
533{
534 struct ceph_osd *osd;
535
536 osd = kzalloc(sizeof(*osd), GFP_NOFS);
537 if (!osd)
538 return NULL;
539
540 atomic_set(&osd->o_ref, 1);
541 osd->o_osdc = osdc;
542 INIT_LIST_HEAD(&osd->o_requests);
f5a2041b 543 INIT_LIST_HEAD(&osd->o_osd_lru);
f24e9980
SW
544 osd->o_incarnation = 1;
545
546 ceph_con_init(osdc->client->msgr, &osd->o_con);
547 osd->o_con.private = osd;
548 osd->o_con.ops = &osd_con_ops;
549 osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
4e7a5dcd 550
422d2cb8 551 INIT_LIST_HEAD(&osd->o_keepalive_item);
f24e9980
SW
552 return osd;
553}
554
555static struct ceph_osd *get_osd(struct ceph_osd *osd)
556{
557 if (atomic_inc_not_zero(&osd->o_ref)) {
558 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
559 atomic_read(&osd->o_ref));
560 return osd;
561 } else {
562 dout("get_osd %p FAIL\n", osd);
563 return NULL;
564 }
565}
566
567static void put_osd(struct ceph_osd *osd)
568{
569 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
570 atomic_read(&osd->o_ref) - 1);
79494d1b
SW
571 if (atomic_dec_and_test(&osd->o_ref)) {
572 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
573
574 if (osd->o_authorizer)
575 ac->ops->destroy_authorizer(ac, osd->o_authorizer);
f24e9980 576 kfree(osd);
79494d1b 577 }
f24e9980
SW
578}
579
580/*
581 * remove an osd from our map
582 */
f5a2041b 583static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
f24e9980 584{
f5a2041b 585 dout("__remove_osd %p\n", osd);
f24e9980
SW
586 BUG_ON(!list_empty(&osd->o_requests));
587 rb_erase(&osd->o_node, &osdc->osds);
f5a2041b 588 list_del_init(&osd->o_osd_lru);
f24e9980
SW
589 ceph_con_close(&osd->o_con);
590 put_osd(osd);
591}
592
f5a2041b
YS
593static void __move_osd_to_lru(struct ceph_osd_client *osdc,
594 struct ceph_osd *osd)
595{
596 dout("__move_osd_to_lru %p\n", osd);
597 BUG_ON(!list_empty(&osd->o_osd_lru));
598 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
599 osd->lru_ttl = jiffies + osdc->client->mount_args->osd_idle_ttl * HZ;
600}
601
602static void __remove_osd_from_lru(struct ceph_osd *osd)
603{
604 dout("__remove_osd_from_lru %p\n", osd);
605 if (!list_empty(&osd->o_osd_lru))
606 list_del_init(&osd->o_osd_lru);
607}
608
609static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all)
610{
611 struct ceph_osd *osd, *nosd;
612
613 dout("__remove_old_osds %p\n", osdc);
614 mutex_lock(&osdc->request_mutex);
615 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
616 if (!remove_all && time_before(jiffies, osd->lru_ttl))
617 break;
618 __remove_osd(osdc, osd);
619 }
620 mutex_unlock(&osdc->request_mutex);
621}
622
f24e9980
SW
623/*
624 * reset osd connect
625 */
f5a2041b 626static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
f24e9980 627{
87b315a5 628 struct ceph_osd_request *req;
f24e9980
SW
629 int ret = 0;
630
f5a2041b 631 dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
f24e9980 632 if (list_empty(&osd->o_requests)) {
f5a2041b 633 __remove_osd(osdc, osd);
87b315a5
SW
634 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
635 &osd->o_con.peer_addr,
636 sizeof(osd->o_con.peer_addr)) == 0 &&
637 !ceph_con_opened(&osd->o_con)) {
638 dout(" osd addr hasn't changed and connection never opened,"
639 " letting msgr retry");
640 /* touch each r_stamp for handle_timeout()'s benfit */
641 list_for_each_entry(req, &osd->o_requests, r_osd_item)
642 req->r_stamp = jiffies;
643 ret = -EAGAIN;
f24e9980
SW
644 } else {
645 ceph_con_close(&osd->o_con);
646 ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
647 osd->o_incarnation++;
648 }
649 return ret;
650}
651
652static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
653{
654 struct rb_node **p = &osdc->osds.rb_node;
655 struct rb_node *parent = NULL;
656 struct ceph_osd *osd = NULL;
657
658 while (*p) {
659 parent = *p;
660 osd = rb_entry(parent, struct ceph_osd, o_node);
661 if (new->o_osd < osd->o_osd)
662 p = &(*p)->rb_left;
663 else if (new->o_osd > osd->o_osd)
664 p = &(*p)->rb_right;
665 else
666 BUG();
667 }
668
669 rb_link_node(&new->o_node, parent, p);
670 rb_insert_color(&new->o_node, &osdc->osds);
671}
672
673static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
674{
675 struct ceph_osd *osd;
676 struct rb_node *n = osdc->osds.rb_node;
677
678 while (n) {
679 osd = rb_entry(n, struct ceph_osd, o_node);
680 if (o < osd->o_osd)
681 n = n->rb_left;
682 else if (o > osd->o_osd)
683 n = n->rb_right;
684 else
685 return osd;
686 }
687 return NULL;
688}
689
422d2cb8
YS
690static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
691{
692 schedule_delayed_work(&osdc->timeout_work,
693 osdc->client->mount_args->osd_keepalive_timeout * HZ);
694}
695
696static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
697{
698 cancel_delayed_work(&osdc->timeout_work);
699}
f24e9980
SW
700
701/*
702 * Register request, assign tid. If this is the first request, set up
703 * the timeout event.
704 */
705static void register_request(struct ceph_osd_client *osdc,
706 struct ceph_osd_request *req)
707{
f24e9980
SW
708 mutex_lock(&osdc->request_mutex);
709 req->r_tid = ++osdc->last_tid;
6df058c0 710 req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
422d2cb8 711 INIT_LIST_HEAD(&req->r_req_lru_item);
f24e9980
SW
712
713 dout("register_request %p tid %lld\n", req, req->r_tid);
714 __insert_request(osdc, req);
715 ceph_osdc_get_request(req);
716 osdc->num_requests++;
717
f24e9980 718 if (osdc->num_requests == 1) {
422d2cb8
YS
719 dout(" first request, scheduling timeout\n");
720 __schedule_osd_timeout(osdc);
f24e9980
SW
721 }
722 mutex_unlock(&osdc->request_mutex);
723}
724
725/*
726 * called under osdc->request_mutex
727 */
728static void __unregister_request(struct ceph_osd_client *osdc,
729 struct ceph_osd_request *req)
730{
731 dout("__unregister_request %p tid %lld\n", req, req->r_tid);
732 rb_erase(&req->r_node, &osdc->requests);
733 osdc->num_requests--;
734
0ba6478d
SW
735 if (req->r_osd) {
736 /* make sure the original request isn't in flight. */
737 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
738
739 list_del_init(&req->r_osd_item);
740 if (list_empty(&req->r_osd->o_requests))
f5a2041b 741 __move_osd_to_lru(osdc, req->r_osd);
0ba6478d
SW
742 req->r_osd = NULL;
743 }
f24e9980
SW
744
745 ceph_osdc_put_request(req);
746
422d2cb8
YS
747 list_del_init(&req->r_req_lru_item);
748 if (osdc->num_requests == 0) {
749 dout(" no requests, canceling timeout\n");
750 __cancel_osd_timeout(osdc);
f24e9980
SW
751 }
752}
753
754/*
755 * Cancel a previously queued request message
756 */
757static void __cancel_request(struct ceph_osd_request *req)
758{
6bc18876 759 if (req->r_sent && req->r_osd) {
f24e9980
SW
760 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
761 req->r_sent = 0;
762 }
422d2cb8 763 list_del_init(&req->r_req_lru_item);
f24e9980
SW
764}
765
766/*
767 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
768 * (as needed), and set the request r_osd appropriately. If there is
769 * no up osd, set r_osd to NULL.
770 *
771 * Return 0 if unchanged, 1 if changed, or negative on error.
772 *
773 * Caller should hold map_sem for read and request_mutex.
774 */
775static int __map_osds(struct ceph_osd_client *osdc,
776 struct ceph_osd_request *req)
777{
778 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
51042122 779 struct ceph_pg pgid;
d85b7056
SW
780 int acting[CEPH_PG_MAX_SIZE];
781 int o = -1, num = 0;
f24e9980 782 int err;
f24e9980
SW
783
784 dout("map_osds %p tid %lld\n", req, req->r_tid);
785 err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
786 &req->r_file_layout, osdc->osdmap);
787 if (err)
788 return err;
51042122 789 pgid = reqhead->layout.ol_pgid;
7740a42f
SW
790 req->r_pgid = pgid;
791
d85b7056
SW
792 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
793 if (err > 0) {
794 o = acting[0];
795 num = err;
796 }
f24e9980
SW
797
798 if ((req->r_osd && req->r_osd->o_osd == o &&
d85b7056
SW
799 req->r_sent >= req->r_osd->o_incarnation &&
800 req->r_num_pg_osds == num &&
801 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
f24e9980
SW
802 (req->r_osd == NULL && o == -1))
803 return 0; /* no change */
804
51042122
SW
805 dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n",
806 req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
f24e9980
SW
807 req->r_osd ? req->r_osd->o_osd : -1);
808
d85b7056
SW
809 /* record full pg acting set */
810 memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
811 req->r_num_pg_osds = num;
812
f24e9980
SW
813 if (req->r_osd) {
814 __cancel_request(req);
815 list_del_init(&req->r_osd_item);
f24e9980
SW
816 req->r_osd = NULL;
817 }
818
819 req->r_osd = __lookup_osd(osdc, o);
820 if (!req->r_osd && o >= 0) {
c99eb1c7
SW
821 err = -ENOMEM;
822 req->r_osd = create_osd(osdc);
823 if (!req->r_osd)
824 goto out;
f24e9980
SW
825
826 dout("map_osds osd %p is osd%d\n", req->r_osd, o);
827 req->r_osd->o_osd = o;
828 req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
829 __insert_osd(osdc, req->r_osd);
830
831 ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
832 }
833
f5a2041b
YS
834 if (req->r_osd) {
835 __remove_osd_from_lru(req->r_osd);
f24e9980 836 list_add(&req->r_osd_item, &req->r_osd->o_requests);
f5a2041b 837 }
d85b7056 838 err = 1; /* osd or pg changed */
f24e9980
SW
839
840out:
f24e9980
SW
841 return err;
842}
843
844/*
845 * caller should hold map_sem (for read) and request_mutex
846 */
847static int __send_request(struct ceph_osd_client *osdc,
848 struct ceph_osd_request *req)
849{
850 struct ceph_osd_request_head *reqhead;
851 int err;
852
853 err = __map_osds(osdc, req);
854 if (err < 0)
855 return err;
856 if (req->r_osd == NULL) {
857 dout("send_request %p no up osds in pg\n", req);
858 ceph_monc_request_next_osdmap(&osdc->client->monc);
859 return 0;
860 }
861
862 dout("send_request %p tid %llu to osd%d flags %d\n",
863 req, req->r_tid, req->r_osd->o_osd, req->r_flags);
864
865 reqhead = req->r_request->front.iov_base;
866 reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
867 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
868 reqhead->reassert_version = req->r_reassert_version;
869
3dd72fc0 870 req->r_stamp = jiffies;
07a27e22 871 list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
f24e9980
SW
872
873 ceph_msg_get(req->r_request); /* send consumes a ref */
874 ceph_con_send(&req->r_osd->o_con, req->r_request);
875 req->r_sent = req->r_osd->o_incarnation;
876 return 0;
877}
878
879/*
880 * Timeout callback, called every N seconds when 1 or more osd
881 * requests has been active for more than N seconds. When this
882 * happens, we ping all OSDs with requests who have timed out to
883 * ensure any communications channel reset is detected. Reset the
884 * request timeouts another N seconds in the future as we go.
885 * Reschedule the timeout event another N seconds in future (unless
886 * there are no open requests).
887 */
888static void handle_timeout(struct work_struct *work)
889{
890 struct ceph_osd_client *osdc =
891 container_of(work, struct ceph_osd_client, timeout_work.work);
422d2cb8 892 struct ceph_osd_request *req, *last_req = NULL;
f24e9980 893 struct ceph_osd *osd;
6b805185 894 unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ;
422d2cb8
YS
895 unsigned long keepalive =
896 osdc->client->mount_args->osd_keepalive_timeout * HZ;
3dd72fc0 897 unsigned long last_stamp = 0;
f24e9980 898 struct rb_node *p;
422d2cb8 899 struct list_head slow_osds;
f24e9980
SW
900
901 dout("timeout\n");
902 down_read(&osdc->map_sem);
903
904 ceph_monc_request_next_osdmap(&osdc->client->monc);
905
906 mutex_lock(&osdc->request_mutex);
907 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
908 req = rb_entry(p, struct ceph_osd_request, r_node);
909
910 if (req->r_resend) {
911 int err;
912
913 dout("osdc resending prev failed %lld\n", req->r_tid);
914 err = __send_request(osdc, req);
915 if (err)
916 dout("osdc failed again on %lld\n", req->r_tid);
917 else
918 req->r_resend = false;
919 continue;
920 }
921 }
f24e9980 922
422d2cb8
YS
923 /*
924 * reset osds that appear to be _really_ unresponsive. this
925 * is a failsafe measure.. we really shouldn't be getting to
926 * this point if the system is working properly. the monitors
927 * should mark the osd as failed and we should find out about
928 * it from an updated osd map.
929 */
f26e681d 930 while (timeout && !list_empty(&osdc->req_lru)) {
422d2cb8
YS
931 req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
932 r_req_lru_item);
933
3dd72fc0 934 if (time_before(jiffies, req->r_stamp + timeout))
422d2cb8
YS
935 break;
936
3dd72fc0 937 BUG_ON(req == last_req && req->r_stamp == last_stamp);
422d2cb8 938 last_req = req;
3dd72fc0 939 last_stamp = req->r_stamp;
422d2cb8
YS
940
941 osd = req->r_osd;
942 BUG_ON(!osd);
943 pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
944 req->r_tid, osd->o_osd);
945 __kick_requests(osdc, osd);
946 }
947
948 /*
949 * ping osds that are a bit slow. this ensures that if there
950 * is a break in the TCP connection we will notice, and reopen
951 * a connection with that osd (from the fault callback).
952 */
953 INIT_LIST_HEAD(&slow_osds);
954 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
3dd72fc0 955 if (time_before(jiffies, req->r_stamp + keepalive))
422d2cb8
YS
956 break;
957
958 osd = req->r_osd;
959 BUG_ON(!osd);
960 dout(" tid %llu is slow, will send keepalive on osd%d\n",
f24e9980 961 req->r_tid, osd->o_osd);
422d2cb8
YS
962 list_move_tail(&osd->o_keepalive_item, &slow_osds);
963 }
964 while (!list_empty(&slow_osds)) {
965 osd = list_entry(slow_osds.next, struct ceph_osd,
966 o_keepalive_item);
967 list_del_init(&osd->o_keepalive_item);
f24e9980
SW
968 ceph_con_keepalive(&osd->o_con);
969 }
970
422d2cb8 971 __schedule_osd_timeout(osdc);
f24e9980
SW
972 mutex_unlock(&osdc->request_mutex);
973
974 up_read(&osdc->map_sem);
975}
976
f5a2041b
YS
977static void handle_osds_timeout(struct work_struct *work)
978{
979 struct ceph_osd_client *osdc =
980 container_of(work, struct ceph_osd_client,
981 osds_timeout_work.work);
982 unsigned long delay =
983 osdc->client->mount_args->osd_idle_ttl * HZ >> 2;
984
985 dout("osds timeout\n");
986 down_read(&osdc->map_sem);
987 remove_old_osds(osdc, 0);
988 up_read(&osdc->map_sem);
989
990 schedule_delayed_work(&osdc->osds_timeout_work,
991 round_jiffies_relative(delay));
992}
993
f24e9980
SW
994/*
995 * handle osd op reply. either call the callback if it is specified,
996 * or do the completion to wake up the waiting thread.
997 */
350b1c32
SW
998static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
999 struct ceph_connection *con)
f24e9980
SW
1000{
1001 struct ceph_osd_reply_head *rhead = msg->front.iov_base;
1002 struct ceph_osd_request *req;
1003 u64 tid;
1004 int numops, object_len, flags;
0ceed5db 1005 s32 result;
f24e9980 1006
6df058c0 1007 tid = le64_to_cpu(msg->hdr.tid);
f24e9980
SW
1008 if (msg->front.iov_len < sizeof(*rhead))
1009 goto bad;
f24e9980
SW
1010 numops = le32_to_cpu(rhead->num_ops);
1011 object_len = le32_to_cpu(rhead->object_len);
0ceed5db 1012 result = le32_to_cpu(rhead->result);
f24e9980
SW
1013 if (msg->front.iov_len != sizeof(*rhead) + object_len +
1014 numops * sizeof(struct ceph_osd_op))
1015 goto bad;
0ceed5db 1016 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
f24e9980
SW
1017
1018 /* lookup */
1019 mutex_lock(&osdc->request_mutex);
1020 req = __lookup_request(osdc, tid);
1021 if (req == NULL) {
1022 dout("handle_reply tid %llu dne\n", tid);
1023 mutex_unlock(&osdc->request_mutex);
1024 return;
1025 }
1026 ceph_osdc_get_request(req);
1027 flags = le32_to_cpu(rhead->flags);
1028
350b1c32 1029 /*
0d59ab81 1030 * if this connection filled our message, drop our reference now, to
350b1c32
SW
1031 * avoid a (safe but slower) revoke later.
1032 */
0d59ab81 1033 if (req->r_con_filling_msg == con && req->r_reply == msg) {
c16e7869 1034 dout(" dropping con_filling_msg ref %p\n", con);
0d59ab81 1035 req->r_con_filling_msg = NULL;
350b1c32
SW
1036 ceph_con_put(con);
1037 }
1038
f24e9980
SW
1039 if (!req->r_got_reply) {
1040 unsigned bytes;
1041
1042 req->r_result = le32_to_cpu(rhead->result);
1043 bytes = le32_to_cpu(msg->hdr.data_len);
1044 dout("handle_reply result %d bytes %d\n", req->r_result,
1045 bytes);
1046 if (req->r_result == 0)
1047 req->r_result = bytes;
1048
1049 /* in case this is a write and we need to replay, */
1050 req->r_reassert_version = rhead->reassert_version;
1051
1052 req->r_got_reply = 1;
1053 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
1054 dout("handle_reply tid %llu dup ack\n", tid);
34b43a56 1055 mutex_unlock(&osdc->request_mutex);
f24e9980
SW
1056 goto done;
1057 }
1058
1059 dout("handle_reply tid %llu flags %d\n", tid, flags);
1060
1061 /* either this is a read, or we got the safe response */
0ceed5db
SW
1062 if (result < 0 ||
1063 (flags & CEPH_OSD_FLAG_ONDISK) ||
f24e9980
SW
1064 ((flags & CEPH_OSD_FLAG_WRITE) == 0))
1065 __unregister_request(osdc, req);
1066
1067 mutex_unlock(&osdc->request_mutex);
1068
1069 if (req->r_callback)
1070 req->r_callback(req, msg);
1071 else
03066f23 1072 complete_all(&req->r_completion);
f24e9980
SW
1073
1074 if (flags & CEPH_OSD_FLAG_ONDISK) {
1075 if (req->r_safe_callback)
1076 req->r_safe_callback(req, msg);
03066f23 1077 complete_all(&req->r_safe_completion); /* fsync waiter */
f24e9980
SW
1078 }
1079
1080done:
1081 ceph_osdc_put_request(req);
1082 return;
1083
1084bad:
1085 pr_err("corrupt osd_op_reply got %d %d expected %d\n",
1086 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
1087 (int)sizeof(*rhead));
9ec7cab1 1088 ceph_msg_dump(msg);
f24e9980
SW
1089}
1090
1091
422d2cb8 1092static int __kick_requests(struct ceph_osd_client *osdc,
f24e9980
SW
1093 struct ceph_osd *kickosd)
1094{
1095 struct ceph_osd_request *req;
1096 struct rb_node *p, *n;
1097 int needmap = 0;
1098 int err;
1099
1100 dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
153a008b 1101 if (kickosd) {
87b315a5
SW
1102 err = __reset_osd(osdc, kickosd);
1103 if (err == -EAGAIN)
1104 return 1;
153a008b 1105 } else {
f24e9980
SW
1106 for (p = rb_first(&osdc->osds); p; p = n) {
1107 struct ceph_osd *osd =
1108 rb_entry(p, struct ceph_osd, o_node);
1109
1110 n = rb_next(p);
1111 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
103e2d3a
SW
1112 memcmp(&osd->o_con.peer_addr,
1113 ceph_osd_addr(osdc->osdmap,
1114 osd->o_osd),
1115 sizeof(struct ceph_entity_addr)) != 0)
f5a2041b 1116 __reset_osd(osdc, osd);
f24e9980
SW
1117 }
1118 }
1119
1120 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
1121 req = rb_entry(p, struct ceph_osd_request, r_node);
1122
1123 if (req->r_resend) {
1124 dout(" r_resend set on tid %llu\n", req->r_tid);
266673db 1125 __cancel_request(req);
f24e9980
SW
1126 goto kick;
1127 }
266673db
SW
1128 if (req->r_osd && kickosd == req->r_osd) {
1129 __cancel_request(req);
f24e9980 1130 goto kick;
266673db 1131 }
f24e9980
SW
1132
1133 err = __map_osds(osdc, req);
1134 if (err == 0)
1135 continue; /* no change */
1136 if (err < 0) {
1137 /*
1138 * FIXME: really, we should set the request
1139 * error and fail if this isn't a 'nofail'
1140 * request, but that's a fair bit more
1141 * complicated to do. So retry!
1142 */
1143 dout(" setting r_resend on %llu\n", req->r_tid);
1144 req->r_resend = true;
1145 continue;
1146 }
1147 if (req->r_osd == NULL) {
1148 dout("tid %llu maps to no valid osd\n", req->r_tid);
1149 needmap++; /* request a newer map */
1150 continue;
1151 }
1152
1153kick:
c1ea8823 1154 dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
12eadc19 1155 req->r_osd ? req->r_osd->o_osd : -1);
f24e9980
SW
1156 req->r_flags |= CEPH_OSD_FLAG_RETRY;
1157 err = __send_request(osdc, req);
1158 if (err) {
1159 dout(" setting r_resend on %llu\n", req->r_tid);
1160 req->r_resend = true;
1161 }
1162 }
422d2cb8
YS
1163
1164 return needmap;
1165}
1166
1167/*
1168 * Resubmit osd requests whose osd or osd address has changed. Request
1169 * a new osd map if osds are down, or we are otherwise unable to determine
1170 * how to direct a request.
1171 *
1172 * Close connections to down osds.
1173 *
1174 * If @who is specified, resubmit requests for that specific osd.
1175 *
1176 * Caller should hold map_sem for read and request_mutex.
1177 */
1178static void kick_requests(struct ceph_osd_client *osdc,
1179 struct ceph_osd *kickosd)
1180{
1181 int needmap;
1182
1183 mutex_lock(&osdc->request_mutex);
1184 needmap = __kick_requests(osdc, kickosd);
f24e9980
SW
1185 mutex_unlock(&osdc->request_mutex);
1186
1187 if (needmap) {
1188 dout("%d requests for down osds, need new map\n", needmap);
1189 ceph_monc_request_next_osdmap(&osdc->client->monc);
1190 }
f24e9980 1191
422d2cb8 1192}
f24e9980
SW
1193/*
1194 * Process updated osd map.
1195 *
1196 * The message contains any number of incremental and full maps, normally
1197 * indicating some sort of topology change in the cluster. Kick requests
1198 * off to different OSDs as needed.
1199 */
1200void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1201{
1202 void *p, *end, *next;
1203 u32 nr_maps, maplen;
1204 u32 epoch;
1205 struct ceph_osdmap *newmap = NULL, *oldmap;
1206 int err;
1207 struct ceph_fsid fsid;
1208
1209 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1210 p = msg->front.iov_base;
1211 end = p + msg->front.iov_len;
1212
1213 /* verify fsid */
1214 ceph_decode_need(&p, end, sizeof(fsid), bad);
1215 ceph_decode_copy(&p, &fsid, sizeof(fsid));
0743304d
SW
1216 if (ceph_check_fsid(osdc->client, &fsid) < 0)
1217 return;
f24e9980
SW
1218
1219 down_write(&osdc->map_sem);
1220
1221 /* incremental maps */
1222 ceph_decode_32_safe(&p, end, nr_maps, bad);
1223 dout(" %d inc maps\n", nr_maps);
1224 while (nr_maps > 0) {
1225 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
c89136ea
SW
1226 epoch = ceph_decode_32(&p);
1227 maplen = ceph_decode_32(&p);
f24e9980
SW
1228 ceph_decode_need(&p, end, maplen, bad);
1229 next = p + maplen;
1230 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
1231 dout("applying incremental map %u len %d\n",
1232 epoch, maplen);
1233 newmap = osdmap_apply_incremental(&p, next,
1234 osdc->osdmap,
1235 osdc->client->msgr);
1236 if (IS_ERR(newmap)) {
1237 err = PTR_ERR(newmap);
1238 goto bad;
1239 }
30dc6381 1240 BUG_ON(!newmap);
f24e9980
SW
1241 if (newmap != osdc->osdmap) {
1242 ceph_osdmap_destroy(osdc->osdmap);
1243 osdc->osdmap = newmap;
1244 }
1245 } else {
1246 dout("ignoring incremental map %u len %d\n",
1247 epoch, maplen);
1248 }
1249 p = next;
1250 nr_maps--;
1251 }
1252 if (newmap)
1253 goto done;
1254
1255 /* full maps */
1256 ceph_decode_32_safe(&p, end, nr_maps, bad);
1257 dout(" %d full maps\n", nr_maps);
1258 while (nr_maps) {
1259 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
c89136ea
SW
1260 epoch = ceph_decode_32(&p);
1261 maplen = ceph_decode_32(&p);
f24e9980
SW
1262 ceph_decode_need(&p, end, maplen, bad);
1263 if (nr_maps > 1) {
1264 dout("skipping non-latest full map %u len %d\n",
1265 epoch, maplen);
1266 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
1267 dout("skipping full map %u len %d, "
1268 "older than our %u\n", epoch, maplen,
1269 osdc->osdmap->epoch);
1270 } else {
1271 dout("taking full map %u len %d\n", epoch, maplen);
1272 newmap = osdmap_decode(&p, p+maplen);
1273 if (IS_ERR(newmap)) {
1274 err = PTR_ERR(newmap);
1275 goto bad;
1276 }
30dc6381 1277 BUG_ON(!newmap);
f24e9980
SW
1278 oldmap = osdc->osdmap;
1279 osdc->osdmap = newmap;
1280 if (oldmap)
1281 ceph_osdmap_destroy(oldmap);
1282 }
1283 p += maplen;
1284 nr_maps--;
1285 }
1286
1287done:
1288 downgrade_write(&osdc->map_sem);
1289 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1290 if (newmap)
1291 kick_requests(osdc, NULL);
1292 up_read(&osdc->map_sem);
03066f23 1293 wake_up_all(&osdc->client->auth_wq);
f24e9980
SW
1294 return;
1295
1296bad:
1297 pr_err("osdc handle_map corrupt msg\n");
9ec7cab1 1298 ceph_msg_dump(msg);
f24e9980
SW
1299 up_write(&osdc->map_sem);
1300 return;
1301}
1302
f24e9980
SW
1303/*
1304 * Register request, send initial attempt.
1305 */
1306int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1307 struct ceph_osd_request *req,
1308 bool nofail)
1309{
c1ea8823 1310 int rc = 0;
f24e9980
SW
1311
1312 req->r_request->pages = req->r_pages;
1313 req->r_request->nr_pages = req->r_num_pages;
68b4476b
YS
1314#ifdef CONFIG_BLOCK
1315 req->r_request->bio = req->r_bio;
1316#endif
1317 req->r_request->trail = req->r_trail;
f24e9980
SW
1318
1319 register_request(osdc, req);
1320
1321 down_read(&osdc->map_sem);
1322 mutex_lock(&osdc->request_mutex);
c1ea8823
SW
1323 /*
1324 * a racing kick_requests() may have sent the message for us
1325 * while we dropped request_mutex above, so only send now if
1326 * the request still han't been touched yet.
1327 */
1328 if (req->r_sent == 0) {
1329 rc = __send_request(osdc, req);
1330 if (rc) {
1331 if (nofail) {
1332 dout("osdc_start_request failed send, "
1333 " marking %lld\n", req->r_tid);
1334 req->r_resend = true;
1335 rc = 0;
1336 } else {
1337 __unregister_request(osdc, req);
1338 }
f24e9980
SW
1339 }
1340 }
1341 mutex_unlock(&osdc->request_mutex);
1342 up_read(&osdc->map_sem);
1343 return rc;
1344}
1345
1346/*
1347 * wait for a request to complete
1348 */
1349int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1350 struct ceph_osd_request *req)
1351{
1352 int rc;
1353
1354 rc = wait_for_completion_interruptible(&req->r_completion);
1355 if (rc < 0) {
1356 mutex_lock(&osdc->request_mutex);
1357 __cancel_request(req);
529cfcc4 1358 __unregister_request(osdc, req);
f24e9980 1359 mutex_unlock(&osdc->request_mutex);
529cfcc4 1360 dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
f24e9980
SW
1361 return rc;
1362 }
1363
1364 dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1365 return req->r_result;
1366}
1367
1368/*
1369 * sync - wait for all in-flight requests to flush. avoid starvation.
1370 */
1371void ceph_osdc_sync(struct ceph_osd_client *osdc)
1372{
1373 struct ceph_osd_request *req;
1374 u64 last_tid, next_tid = 0;
1375
1376 mutex_lock(&osdc->request_mutex);
1377 last_tid = osdc->last_tid;
1378 while (1) {
1379 req = __lookup_request_ge(osdc, next_tid);
1380 if (!req)
1381 break;
1382 if (req->r_tid > last_tid)
1383 break;
1384
1385 next_tid = req->r_tid + 1;
1386 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1387 continue;
1388
1389 ceph_osdc_get_request(req);
1390 mutex_unlock(&osdc->request_mutex);
1391 dout("sync waiting on tid %llu (last is %llu)\n",
1392 req->r_tid, last_tid);
1393 wait_for_completion(&req->r_safe_completion);
1394 mutex_lock(&osdc->request_mutex);
1395 ceph_osdc_put_request(req);
1396 }
1397 mutex_unlock(&osdc->request_mutex);
1398 dout("sync done (thru tid %llu)\n", last_tid);
1399}
1400
1401/*
1402 * init, shutdown
1403 */
1404int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1405{
1406 int err;
1407
1408 dout("init\n");
1409 osdc->client = client;
1410 osdc->osdmap = NULL;
1411 init_rwsem(&osdc->map_sem);
1412 init_completion(&osdc->map_waiters);
1413 osdc->last_requested_map = 0;
1414 mutex_init(&osdc->request_mutex);
f24e9980
SW
1415 osdc->last_tid = 0;
1416 osdc->osds = RB_ROOT;
f5a2041b 1417 INIT_LIST_HEAD(&osdc->osd_lru);
f24e9980 1418 osdc->requests = RB_ROOT;
422d2cb8 1419 INIT_LIST_HEAD(&osdc->req_lru);
f24e9980
SW
1420 osdc->num_requests = 0;
1421 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
f5a2041b
YS
1422 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1423
1424 schedule_delayed_work(&osdc->osds_timeout_work,
1425 round_jiffies_relative(osdc->client->mount_args->osd_idle_ttl * HZ));
f24e9980 1426
5f44f142 1427 err = -ENOMEM;
f24e9980
SW
1428 osdc->req_mempool = mempool_create_kmalloc_pool(10,
1429 sizeof(struct ceph_osd_request));
1430 if (!osdc->req_mempool)
5f44f142 1431 goto out;
f24e9980 1432
4f48280e
SW
1433 err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true,
1434 "osd_op");
f24e9980 1435 if (err < 0)
5f44f142 1436 goto out_mempool;
c16e7869 1437 err = ceph_msgpool_init(&osdc->msgpool_op_reply,
4f48280e
SW
1438 OSD_OPREPLY_FRONT_LEN, 10, true,
1439 "osd_op_reply");
c16e7869
SW
1440 if (err < 0)
1441 goto out_msgpool;
f24e9980 1442 return 0;
5f44f142 1443
c16e7869
SW
1444out_msgpool:
1445 ceph_msgpool_destroy(&osdc->msgpool_op);
5f44f142
SW
1446out_mempool:
1447 mempool_destroy(osdc->req_mempool);
1448out:
1449 return err;
f24e9980
SW
1450}
1451
1452void ceph_osdc_stop(struct ceph_osd_client *osdc)
1453{
1454 cancel_delayed_work_sync(&osdc->timeout_work);
f5a2041b 1455 cancel_delayed_work_sync(&osdc->osds_timeout_work);
f24e9980
SW
1456 if (osdc->osdmap) {
1457 ceph_osdmap_destroy(osdc->osdmap);
1458 osdc->osdmap = NULL;
1459 }
f5a2041b 1460 remove_old_osds(osdc, 1);
f24e9980
SW
1461 mempool_destroy(osdc->req_mempool);
1462 ceph_msgpool_destroy(&osdc->msgpool_op);
c16e7869 1463 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
f24e9980
SW
1464}
1465
1466/*
1467 * Read some contiguous pages. If we cross a stripe boundary, shorten
1468 * *plen. Return number of bytes read, or error.
1469 */
1470int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1471 struct ceph_vino vino, struct ceph_file_layout *layout,
1472 u64 off, u64 *plen,
1473 u32 truncate_seq, u64 truncate_size,
1474 struct page **pages, int num_pages)
1475{
1476 struct ceph_osd_request *req;
1477 int rc = 0;
1478
1479 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1480 vino.snap, off, *plen);
1481 req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1482 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1483 NULL, 0, truncate_seq, truncate_size, NULL,
1484 false, 1);
a79832f2
SW
1485 if (!req)
1486 return -ENOMEM;
f24e9980
SW
1487
1488 /* it may be a short read due to an object boundary */
1489 req->r_pages = pages;
f24e9980
SW
1490
1491 dout("readpages final extent is %llu~%llu (%d pages)\n",
1492 off, *plen, req->r_num_pages);
1493
1494 rc = ceph_osdc_start_request(osdc, req, false);
1495 if (!rc)
1496 rc = ceph_osdc_wait_request(osdc, req);
1497
1498 ceph_osdc_put_request(req);
1499 dout("readpages result %d\n", rc);
1500 return rc;
1501}
1502
1503/*
1504 * do a synchronous write on N pages
1505 */
1506int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1507 struct ceph_file_layout *layout,
1508 struct ceph_snap_context *snapc,
1509 u64 off, u64 len,
1510 u32 truncate_seq, u64 truncate_size,
1511 struct timespec *mtime,
1512 struct page **pages, int num_pages,
1513 int flags, int do_sync, bool nofail)
1514{
1515 struct ceph_osd_request *req;
1516 int rc = 0;
1517
1518 BUG_ON(vino.snap != CEPH_NOSNAP);
1519 req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1520 CEPH_OSD_OP_WRITE,
1521 flags | CEPH_OSD_FLAG_ONDISK |
1522 CEPH_OSD_FLAG_WRITE,
1523 snapc, do_sync,
1524 truncate_seq, truncate_size, mtime,
1525 nofail, 1);
a79832f2
SW
1526 if (!req)
1527 return -ENOMEM;
f24e9980
SW
1528
1529 /* it may be a short write due to an object boundary */
1530 req->r_pages = pages;
f24e9980
SW
1531 dout("writepages %llu~%llu (%d pages)\n", off, len,
1532 req->r_num_pages);
1533
1534 rc = ceph_osdc_start_request(osdc, req, nofail);
1535 if (!rc)
1536 rc = ceph_osdc_wait_request(osdc, req);
1537
1538 ceph_osdc_put_request(req);
1539 if (rc == 0)
1540 rc = len;
1541 dout("writepages result %d\n", rc);
1542 return rc;
1543}
1544
1545/*
1546 * handle incoming message
1547 */
1548static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1549{
1550 struct ceph_osd *osd = con->private;
32c895e7 1551 struct ceph_osd_client *osdc;
f24e9980
SW
1552 int type = le16_to_cpu(msg->hdr.type);
1553
1554 if (!osd)
4a32f93d 1555 goto out;
32c895e7 1556 osdc = osd->o_osdc;
f24e9980
SW
1557
1558 switch (type) {
1559 case CEPH_MSG_OSD_MAP:
1560 ceph_osdc_handle_map(osdc, msg);
1561 break;
1562 case CEPH_MSG_OSD_OPREPLY:
350b1c32 1563 handle_reply(osdc, msg, con);
f24e9980
SW
1564 break;
1565
1566 default:
1567 pr_err("received unknown message type %d %s\n", type,
1568 ceph_msg_type_name(type));
1569 }
4a32f93d 1570out:
f24e9980
SW
1571 ceph_msg_put(msg);
1572}
1573
5b3a4db3 1574/*
21b667f6
SW
1575 * lookup and return message for incoming reply. set up reply message
1576 * pages.
5b3a4db3
SW
1577 */
1578static struct ceph_msg *get_reply(struct ceph_connection *con,
2450418c
YS
1579 struct ceph_msg_header *hdr,
1580 int *skip)
f24e9980
SW
1581{
1582 struct ceph_osd *osd = con->private;
1583 struct ceph_osd_client *osdc = osd->o_osdc;
2450418c 1584 struct ceph_msg *m;
0547a9b3 1585 struct ceph_osd_request *req;
5b3a4db3
SW
1586 int front = le32_to_cpu(hdr->front_len);
1587 int data_len = le32_to_cpu(hdr->data_len);
0547a9b3 1588 u64 tid;
f24e9980 1589
0547a9b3
YS
1590 tid = le64_to_cpu(hdr->tid);
1591 mutex_lock(&osdc->request_mutex);
1592 req = __lookup_request(osdc, tid);
1593 if (!req) {
1594 *skip = 1;
1595 m = NULL;
c16e7869 1596 pr_info("get_reply unknown tid %llu from osd%d\n", tid,
5b3a4db3 1597 osd->o_osd);
0547a9b3
YS
1598 goto out;
1599 }
c16e7869
SW
1600
1601 if (req->r_con_filling_msg) {
1602 dout("get_reply revoking msg %p from old con %p\n",
1603 req->r_reply, req->r_con_filling_msg);
1604 ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
1605 ceph_con_put(req->r_con_filling_msg);
6f46cb29 1606 req->r_con_filling_msg = NULL;
0547a9b3
YS
1607 }
1608
c16e7869
SW
1609 if (front > req->r_reply->front.iov_len) {
1610 pr_warning("get_reply front %d > preallocated %d\n",
1611 front, (int)req->r_reply->front.iov_len);
34d23762 1612 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS);
a79832f2 1613 if (!m)
c16e7869
SW
1614 goto out;
1615 ceph_msg_put(req->r_reply);
1616 req->r_reply = m;
1617 }
1618 m = ceph_msg_get(req->r_reply);
1619
0547a9b3 1620 if (data_len > 0) {
21b667f6
SW
1621 unsigned data_off = le16_to_cpu(hdr->data_off);
1622 int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
1623
1624 if (unlikely(req->r_num_pages < want)) {
1625 pr_warning("tid %lld reply %d > expected %d pages\n",
1626 tid, want, m->nr_pages);
0547a9b3
YS
1627 *skip = 1;
1628 ceph_msg_put(m);
a79832f2 1629 m = NULL;
21b667f6 1630 goto out;
0547a9b3 1631 }
21b667f6
SW
1632 m->pages = req->r_pages;
1633 m->nr_pages = req->r_num_pages;
68b4476b
YS
1634#ifdef CONFIG_BLOCK
1635 m->bio = req->r_bio;
1636#endif
0547a9b3 1637 }
5b3a4db3 1638 *skip = 0;
c16e7869
SW
1639 req->r_con_filling_msg = ceph_con_get(con);
1640 dout("get_reply tid %lld %p\n", tid, m);
0547a9b3
YS
1641
1642out:
1643 mutex_unlock(&osdc->request_mutex);
2450418c 1644 return m;
5b3a4db3
SW
1645
1646}
1647
1648static struct ceph_msg *alloc_msg(struct ceph_connection *con,
1649 struct ceph_msg_header *hdr,
1650 int *skip)
1651{
1652 struct ceph_osd *osd = con->private;
1653 int type = le16_to_cpu(hdr->type);
1654 int front = le32_to_cpu(hdr->front_len);
1655
1656 switch (type) {
1657 case CEPH_MSG_OSD_MAP:
34d23762 1658 return ceph_msg_new(type, front, GFP_NOFS);
5b3a4db3
SW
1659 case CEPH_MSG_OSD_OPREPLY:
1660 return get_reply(con, hdr, skip);
1661 default:
1662 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
1663 osd->o_osd);
1664 *skip = 1;
1665 return NULL;
1666 }
f24e9980
SW
1667}
1668
1669/*
1670 * Wrappers to refcount containing ceph_osd struct
1671 */
1672static struct ceph_connection *get_osd_con(struct ceph_connection *con)
1673{
1674 struct ceph_osd *osd = con->private;
1675 if (get_osd(osd))
1676 return con;
1677 return NULL;
1678}
1679
1680static void put_osd_con(struct ceph_connection *con)
1681{
1682 struct ceph_osd *osd = con->private;
1683 put_osd(osd);
1684}
1685
4e7a5dcd
SW
1686/*
1687 * authentication
1688 */
1689static int get_authorizer(struct ceph_connection *con,
213c99ee
SW
1690 void **buf, int *len, int *proto,
1691 void **reply_buf, int *reply_len, int force_new)
4e7a5dcd
SW
1692{
1693 struct ceph_osd *o = con->private;
1694 struct ceph_osd_client *osdc = o->o_osdc;
1695 struct ceph_auth_client *ac = osdc->client->monc.auth;
1696 int ret = 0;
1697
1698 if (force_new && o->o_authorizer) {
1699 ac->ops->destroy_authorizer(ac, o->o_authorizer);
1700 o->o_authorizer = NULL;
1701 }
1702 if (o->o_authorizer == NULL) {
1703 ret = ac->ops->create_authorizer(
1704 ac, CEPH_ENTITY_TYPE_OSD,
1705 &o->o_authorizer,
1706 &o->o_authorizer_buf,
1707 &o->o_authorizer_buf_len,
1708 &o->o_authorizer_reply_buf,
1709 &o->o_authorizer_reply_buf_len);
1710 if (ret)
213c99ee 1711 return ret;
4e7a5dcd
SW
1712 }
1713
1714 *proto = ac->protocol;
1715 *buf = o->o_authorizer_buf;
1716 *len = o->o_authorizer_buf_len;
1717 *reply_buf = o->o_authorizer_reply_buf;
1718 *reply_len = o->o_authorizer_reply_buf_len;
1719 return 0;
1720}
1721
1722
1723static int verify_authorizer_reply(struct ceph_connection *con, int len)
1724{
1725 struct ceph_osd *o = con->private;
1726 struct ceph_osd_client *osdc = o->o_osdc;
1727 struct ceph_auth_client *ac = osdc->client->monc.auth;
1728
1729 return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len);
1730}
1731
9bd2e6f8
SW
1732static int invalidate_authorizer(struct ceph_connection *con)
1733{
1734 struct ceph_osd *o = con->private;
1735 struct ceph_osd_client *osdc = o->o_osdc;
1736 struct ceph_auth_client *ac = osdc->client->monc.auth;
1737
1738 if (ac->ops->invalidate_authorizer)
1739 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
1740
1741 return ceph_monc_validate_auth(&osdc->client->monc);
1742}
4e7a5dcd 1743
9e32789f 1744static const struct ceph_connection_operations osd_con_ops = {
f24e9980
SW
1745 .get = get_osd_con,
1746 .put = put_osd_con,
1747 .dispatch = dispatch,
4e7a5dcd
SW
1748 .get_authorizer = get_authorizer,
1749 .verify_authorizer_reply = verify_authorizer_reply,
9bd2e6f8 1750 .invalidate_authorizer = invalidate_authorizer,
f24e9980 1751 .alloc_msg = alloc_msg,
81b024e7 1752 .fault = osd_reset,
f24e9980 1753};