ceph: refactor osdc requests creation functions
[linux-2.6-block.git] / fs / ceph / osd_client.c
CommitLineData
f24e9980
SW
1#include "ceph_debug.h"
2
3#include <linux/err.h>
4#include <linux/highmem.h>
5#include <linux/mm.h>
6#include <linux/pagemap.h>
7#include <linux/slab.h>
8#include <linux/uaccess.h>
9
10#include "super.h"
11#include "osd_client.h"
12#include "messenger.h"
13#include "decode.h"
4e7a5dcd 14#include "auth.h"
f24e9980 15
c16e7869
SW
16#define OSD_OP_FRONT_LEN 4096
17#define OSD_OPREPLY_FRONT_LEN 512
0d59ab81 18
9e32789f 19static const struct ceph_connection_operations osd_con_ops;
422d2cb8
YS
20static int __kick_requests(struct ceph_osd_client *osdc,
21 struct ceph_osd *kickosd);
f24e9980
SW
22
23static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
24
3499e8a5
YS
25void ceph_calc_raw_layout(struct ceph_osd_client *osdc,
26 struct ceph_file_layout *layout,
27 u64 snapid,
28 u64 off, u64 len, u64 *bno,
29 struct ceph_osd_request *req)
30{
31 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
32 struct ceph_osd_op *op = (void *)(reqhead + 1);
33 u64 orig_len = len;
34 u64 objoff, objlen; /* extent in object */
35
36 reqhead->snapid = cpu_to_le64(snapid);
37
38 /* object extent? */
39 ceph_calc_file_object_mapping(layout, off, &len, bno,
40 &objoff, &objlen);
41 if (len < orig_len)
42 dout(" skipping last %llu, final file extent %llu~%llu\n",
43 orig_len - len, off, len);
44
45 op->extent.offset = cpu_to_le64(objoff);
46 op->extent.length = cpu_to_le64(objlen);
47 req->r_num_pages = calc_pages_for(off, len);
48
49 dout("calc_layout bno=%llx %llu~%llu (%d pages)\n",
50 *bno, objoff, objlen, req->r_num_pages);
51
52}
53
f24e9980
SW
54/*
55 * Implement client access to distributed object storage cluster.
56 *
57 * All data objects are stored within a cluster/cloud of OSDs, or
58 * "object storage devices." (Note that Ceph OSDs have _nothing_ to
59 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply
60 * remote daemons serving up and coordinating consistent and safe
61 * access to storage.
62 *
63 * Cluster membership and the mapping of data objects onto storage devices
64 * are described by the osd map.
65 *
66 * We keep track of pending OSD requests (read, write), resubmit
67 * requests to different OSDs when the cluster topology/data layout
68 * change, or retry the affected requests when the communications
69 * channel with an OSD is reset.
70 */
71
72/*
73 * calculate the mapping of a file extent onto an object, and fill out the
74 * request accordingly. shorten extent as necessary if it crosses an
75 * object boundary.
76 *
77 * fill osd op in request message.
78 */
79static void calc_layout(struct ceph_osd_client *osdc,
3499e8a5
YS
80 struct ceph_vino vino,
81 struct ceph_file_layout *layout,
f24e9980
SW
82 u64 off, u64 *plen,
83 struct ceph_osd_request *req)
84{
f24e9980
SW
85 u64 bno;
86
3499e8a5 87 ceph_calc_raw_layout(osdc, layout, vino.snap, off, *plen, &bno, req);
f24e9980
SW
88
89 sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
90 req->r_oid_len = strlen(req->r_oid);
f24e9980
SW
91}
92
f24e9980
SW
93/*
94 * requests
95 */
415e49a9 96void ceph_osdc_release_request(struct kref *kref)
f24e9980 97{
415e49a9
SW
98 struct ceph_osd_request *req = container_of(kref,
99 struct ceph_osd_request,
100 r_kref);
101
102 if (req->r_request)
103 ceph_msg_put(req->r_request);
104 if (req->r_reply)
105 ceph_msg_put(req->r_reply);
0d59ab81 106 if (req->r_con_filling_msg) {
350b1c32 107 dout("release_request revoking pages %p from con %p\n",
0d59ab81
YS
108 req->r_pages, req->r_con_filling_msg);
109 ceph_con_revoke_message(req->r_con_filling_msg,
110 req->r_reply);
111 ceph_con_put(req->r_con_filling_msg);
350b1c32 112 }
415e49a9
SW
113 if (req->r_own_pages)
114 ceph_release_page_vector(req->r_pages,
115 req->r_num_pages);
116 ceph_put_snap_context(req->r_snapc);
117 if (req->r_mempool)
118 mempool_free(req, req->r_osdc->req_mempool);
119 else
120 kfree(req);
f24e9980
SW
121}
122
3499e8a5
YS
123struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
124 int flags,
f24e9980
SW
125 struct ceph_snap_context *snapc,
126 int do_sync,
3499e8a5
YS
127 bool use_mempool,
128 gfp_t gfp_flags,
129 struct page **pages)
f24e9980
SW
130{
131 struct ceph_osd_request *req;
132 struct ceph_msg *msg;
0c948992 133 int num_op = 1 + do_sync;
3499e8a5
YS
134 size_t msg_size = sizeof(struct ceph_osd_request_head) +
135 num_op*sizeof(struct ceph_osd_op);
136
137 if (use_mempool) {
138 req = mempool_alloc(osdc->req_mempool, gfp_flags);
139 memset(req, 0, sizeof(*req));
140 } else {
141 req = kzalloc(sizeof(*req), gfp_flags);
142 }
143 if (!req)
144 return NULL;
f24e9980
SW
145
146 if (use_mempool) {
3499e8a5 147 req = mempool_alloc(osdc->req_mempool, gfp_flags);
f24e9980
SW
148 memset(req, 0, sizeof(*req));
149 } else {
3499e8a5 150 req = kzalloc(sizeof(*req), gfp_flags);
f24e9980
SW
151 }
152 if (req == NULL)
a79832f2 153 return NULL;
f24e9980 154
f24e9980
SW
155 req->r_osdc = osdc;
156 req->r_mempool = use_mempool;
415e49a9 157 kref_init(&req->r_kref);
f24e9980
SW
158 init_completion(&req->r_completion);
159 init_completion(&req->r_safe_completion);
160 INIT_LIST_HEAD(&req->r_unsafe_item);
161 req->r_flags = flags;
162
163 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
164
c16e7869
SW
165 /* create reply message */
166 if (use_mempool)
167 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
168 else
169 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
3499e8a5 170 OSD_OPREPLY_FRONT_LEN, gfp_flags);
a79832f2 171 if (!msg) {
c16e7869 172 ceph_osdc_put_request(req);
a79832f2 173 return NULL;
c16e7869
SW
174 }
175 req->r_reply = msg;
176
177 /* create request message; allow space for oid */
f24e9980
SW
178 msg_size += 40;
179 if (snapc)
180 msg_size += sizeof(u64) * snapc->num_snaps;
181 if (use_mempool)
8f3bc053 182 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
f24e9980 183 else
3499e8a5 184 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp_flags);
a79832f2 185 if (!msg) {
f24e9980 186 ceph_osdc_put_request(req);
a79832f2 187 return NULL;
f24e9980
SW
188 }
189 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
190 memset(msg->front.iov_base, 0, msg->front.iov_len);
3499e8a5
YS
191
192 req->r_request = msg;
193 req->r_pages = pages;
194
195 return req;
196}
197
198/*
199 * build new request AND message
200 *
201 */
202void ceph_osdc_build_request(struct ceph_osd_request *req,
203 u64 off, u64 *plen,
204 int opcode,
205 struct ceph_snap_context *snapc,
206 int do_sync,
207 u32 truncate_seq,
208 u64 truncate_size,
209 struct timespec *mtime,
210 const char *oid,
211 int oid_len)
212{
213 struct ceph_msg *msg = req->r_request;
214 struct ceph_osd_request_head *head;
215 struct ceph_osd_op *op;
216 void *p;
217 int num_op = 1 + do_sync;
218 size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
219 int i;
220 int flags = req->r_flags;
221
f24e9980
SW
222 head = msg->front.iov_base;
223 op = (void *)(head + 1);
224 p = (void *)(op + num_op);
225
f24e9980
SW
226 req->r_snapc = ceph_get_snap_context(snapc);
227
228 head->client_inc = cpu_to_le32(1); /* always, for now. */
229 head->flags = cpu_to_le32(flags);
230 if (flags & CEPH_OSD_FLAG_WRITE)
231 ceph_encode_timespec(&head->mtime, mtime);
232 head->num_ops = cpu_to_le16(num_op);
233 op->op = cpu_to_le16(opcode);
234
f24e9980
SW
235 if (flags & CEPH_OSD_FLAG_WRITE) {
236 req->r_request->hdr.data_off = cpu_to_le16(off);
237 req->r_request->hdr.data_len = cpu_to_le32(*plen);
238 op->payload_len = cpu_to_le32(*plen);
239 }
0c948992
YS
240 op->extent.truncate_size = cpu_to_le64(truncate_size);
241 op->extent.truncate_seq = cpu_to_le32(truncate_seq);
f24e9980
SW
242
243 /* fill in oid */
3499e8a5
YS
244 head->object_len = cpu_to_le32(oid_len);
245 memcpy(p, oid, oid_len);
246 p += oid_len;
f24e9980 247
f24e9980
SW
248 if (do_sync) {
249 op++;
250 op->op = cpu_to_le16(CEPH_OSD_OP_STARTSYNC);
251 }
252 if (snapc) {
253 head->snap_seq = cpu_to_le64(snapc->seq);
254 head->num_snaps = cpu_to_le32(snapc->num_snaps);
255 for (i = 0; i < snapc->num_snaps; i++) {
256 put_unaligned_le64(snapc->snaps[i], p);
257 p += sizeof(u64);
258 }
259 }
260
261 BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
6f863e71
SW
262 msg_size = p - msg->front.iov_base;
263 msg->front.iov_len = msg_size;
264 msg->hdr.front_len = cpu_to_le32(msg_size);
3499e8a5
YS
265 return;
266}
267
268/*
269 * build new request AND message, calculate layout, and adjust file
270 * extent as needed.
271 *
272 * if the file was recently truncated, we include information about its
273 * old and new size so that the object can be updated appropriately. (we
274 * avoid synchronously deleting truncated objects because it's slow.)
275 *
276 * if @do_sync, include a 'startsync' command so that the osd will flush
277 * data quickly.
278 */
279struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
280 struct ceph_file_layout *layout,
281 struct ceph_vino vino,
282 u64 off, u64 *plen,
283 int opcode, int flags,
284 struct ceph_snap_context *snapc,
285 int do_sync,
286 u32 truncate_seq,
287 u64 truncate_size,
288 struct timespec *mtime,
289 bool use_mempool, int num_reply)
290{
291 struct ceph_osd_request *req =
292 ceph_osdc_alloc_request(osdc, flags,
293 snapc, do_sync,
294 use_mempool,
295 GFP_NOFS, NULL);
296 if (IS_ERR(req))
297 return req;
298
299 /* calculate max write size */
300 calc_layout(osdc, vino, layout, off, plen, req);
301 req->r_file_layout = *layout; /* keep a copy */
302
303 ceph_osdc_build_request(req, off, plen, opcode,
304 snapc, do_sync,
305 truncate_seq, truncate_size,
306 mtime,
307 req->r_oid, req->r_oid_len);
308
f24e9980
SW
309 return req;
310}
311
312/*
313 * We keep osd requests in an rbtree, sorted by ->r_tid.
314 */
315static void __insert_request(struct ceph_osd_client *osdc,
316 struct ceph_osd_request *new)
317{
318 struct rb_node **p = &osdc->requests.rb_node;
319 struct rb_node *parent = NULL;
320 struct ceph_osd_request *req = NULL;
321
322 while (*p) {
323 parent = *p;
324 req = rb_entry(parent, struct ceph_osd_request, r_node);
325 if (new->r_tid < req->r_tid)
326 p = &(*p)->rb_left;
327 else if (new->r_tid > req->r_tid)
328 p = &(*p)->rb_right;
329 else
330 BUG();
331 }
332
333 rb_link_node(&new->r_node, parent, p);
334 rb_insert_color(&new->r_node, &osdc->requests);
335}
336
337static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
338 u64 tid)
339{
340 struct ceph_osd_request *req;
341 struct rb_node *n = osdc->requests.rb_node;
342
343 while (n) {
344 req = rb_entry(n, struct ceph_osd_request, r_node);
345 if (tid < req->r_tid)
346 n = n->rb_left;
347 else if (tid > req->r_tid)
348 n = n->rb_right;
349 else
350 return req;
351 }
352 return NULL;
353}
354
355static struct ceph_osd_request *
356__lookup_request_ge(struct ceph_osd_client *osdc,
357 u64 tid)
358{
359 struct ceph_osd_request *req;
360 struct rb_node *n = osdc->requests.rb_node;
361
362 while (n) {
363 req = rb_entry(n, struct ceph_osd_request, r_node);
364 if (tid < req->r_tid) {
365 if (!n->rb_left)
366 return req;
367 n = n->rb_left;
368 } else if (tid > req->r_tid) {
369 n = n->rb_right;
370 } else {
371 return req;
372 }
373 }
374 return NULL;
375}
376
377
378/*
81b024e7 379 * If the osd connection drops, we need to resubmit all requests.
f24e9980
SW
380 */
381static void osd_reset(struct ceph_connection *con)
382{
383 struct ceph_osd *osd = con->private;
384 struct ceph_osd_client *osdc;
385
386 if (!osd)
387 return;
388 dout("osd_reset osd%d\n", osd->o_osd);
389 osdc = osd->o_osdc;
f24e9980
SW
390 down_read(&osdc->map_sem);
391 kick_requests(osdc, osd);
392 up_read(&osdc->map_sem);
393}
394
395/*
396 * Track open sessions with osds.
397 */
398static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
399{
400 struct ceph_osd *osd;
401
402 osd = kzalloc(sizeof(*osd), GFP_NOFS);
403 if (!osd)
404 return NULL;
405
406 atomic_set(&osd->o_ref, 1);
407 osd->o_osdc = osdc;
408 INIT_LIST_HEAD(&osd->o_requests);
f5a2041b 409 INIT_LIST_HEAD(&osd->o_osd_lru);
f24e9980
SW
410 osd->o_incarnation = 1;
411
412 ceph_con_init(osdc->client->msgr, &osd->o_con);
413 osd->o_con.private = osd;
414 osd->o_con.ops = &osd_con_ops;
415 osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
4e7a5dcd 416
422d2cb8 417 INIT_LIST_HEAD(&osd->o_keepalive_item);
f24e9980
SW
418 return osd;
419}
420
421static struct ceph_osd *get_osd(struct ceph_osd *osd)
422{
423 if (atomic_inc_not_zero(&osd->o_ref)) {
424 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
425 atomic_read(&osd->o_ref));
426 return osd;
427 } else {
428 dout("get_osd %p FAIL\n", osd);
429 return NULL;
430 }
431}
432
433static void put_osd(struct ceph_osd *osd)
434{
435 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
436 atomic_read(&osd->o_ref) - 1);
79494d1b
SW
437 if (atomic_dec_and_test(&osd->o_ref)) {
438 struct ceph_auth_client *ac = osd->o_osdc->client->monc.auth;
439
440 if (osd->o_authorizer)
441 ac->ops->destroy_authorizer(ac, osd->o_authorizer);
f24e9980 442 kfree(osd);
79494d1b 443 }
f24e9980
SW
444}
445
446/*
447 * remove an osd from our map
448 */
f5a2041b 449static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
f24e9980 450{
f5a2041b 451 dout("__remove_osd %p\n", osd);
f24e9980
SW
452 BUG_ON(!list_empty(&osd->o_requests));
453 rb_erase(&osd->o_node, &osdc->osds);
f5a2041b 454 list_del_init(&osd->o_osd_lru);
f24e9980
SW
455 ceph_con_close(&osd->o_con);
456 put_osd(osd);
457}
458
f5a2041b
YS
459static void __move_osd_to_lru(struct ceph_osd_client *osdc,
460 struct ceph_osd *osd)
461{
462 dout("__move_osd_to_lru %p\n", osd);
463 BUG_ON(!list_empty(&osd->o_osd_lru));
464 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
465 osd->lru_ttl = jiffies + osdc->client->mount_args->osd_idle_ttl * HZ;
466}
467
468static void __remove_osd_from_lru(struct ceph_osd *osd)
469{
470 dout("__remove_osd_from_lru %p\n", osd);
471 if (!list_empty(&osd->o_osd_lru))
472 list_del_init(&osd->o_osd_lru);
473}
474
475static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all)
476{
477 struct ceph_osd *osd, *nosd;
478
479 dout("__remove_old_osds %p\n", osdc);
480 mutex_lock(&osdc->request_mutex);
481 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
482 if (!remove_all && time_before(jiffies, osd->lru_ttl))
483 break;
484 __remove_osd(osdc, osd);
485 }
486 mutex_unlock(&osdc->request_mutex);
487}
488
f24e9980
SW
489/*
490 * reset osd connect
491 */
f5a2041b 492static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
f24e9980 493{
87b315a5 494 struct ceph_osd_request *req;
f24e9980
SW
495 int ret = 0;
496
f5a2041b 497 dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
f24e9980 498 if (list_empty(&osd->o_requests)) {
f5a2041b 499 __remove_osd(osdc, osd);
87b315a5
SW
500 } else if (memcmp(&osdc->osdmap->osd_addr[osd->o_osd],
501 &osd->o_con.peer_addr,
502 sizeof(osd->o_con.peer_addr)) == 0 &&
503 !ceph_con_opened(&osd->o_con)) {
504 dout(" osd addr hasn't changed and connection never opened,"
505 " letting msgr retry");
506 /* touch each r_stamp for handle_timeout()'s benfit */
507 list_for_each_entry(req, &osd->o_requests, r_osd_item)
508 req->r_stamp = jiffies;
509 ret = -EAGAIN;
f24e9980
SW
510 } else {
511 ceph_con_close(&osd->o_con);
512 ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
513 osd->o_incarnation++;
514 }
515 return ret;
516}
517
518static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
519{
520 struct rb_node **p = &osdc->osds.rb_node;
521 struct rb_node *parent = NULL;
522 struct ceph_osd *osd = NULL;
523
524 while (*p) {
525 parent = *p;
526 osd = rb_entry(parent, struct ceph_osd, o_node);
527 if (new->o_osd < osd->o_osd)
528 p = &(*p)->rb_left;
529 else if (new->o_osd > osd->o_osd)
530 p = &(*p)->rb_right;
531 else
532 BUG();
533 }
534
535 rb_link_node(&new->o_node, parent, p);
536 rb_insert_color(&new->o_node, &osdc->osds);
537}
538
539static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
540{
541 struct ceph_osd *osd;
542 struct rb_node *n = osdc->osds.rb_node;
543
544 while (n) {
545 osd = rb_entry(n, struct ceph_osd, o_node);
546 if (o < osd->o_osd)
547 n = n->rb_left;
548 else if (o > osd->o_osd)
549 n = n->rb_right;
550 else
551 return osd;
552 }
553 return NULL;
554}
555
422d2cb8
YS
556static void __schedule_osd_timeout(struct ceph_osd_client *osdc)
557{
558 schedule_delayed_work(&osdc->timeout_work,
559 osdc->client->mount_args->osd_keepalive_timeout * HZ);
560}
561
562static void __cancel_osd_timeout(struct ceph_osd_client *osdc)
563{
564 cancel_delayed_work(&osdc->timeout_work);
565}
f24e9980
SW
566
567/*
568 * Register request, assign tid. If this is the first request, set up
569 * the timeout event.
570 */
571static void register_request(struct ceph_osd_client *osdc,
572 struct ceph_osd_request *req)
573{
f24e9980
SW
574 mutex_lock(&osdc->request_mutex);
575 req->r_tid = ++osdc->last_tid;
6df058c0 576 req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
422d2cb8 577 INIT_LIST_HEAD(&req->r_req_lru_item);
f24e9980
SW
578
579 dout("register_request %p tid %lld\n", req, req->r_tid);
580 __insert_request(osdc, req);
581 ceph_osdc_get_request(req);
582 osdc->num_requests++;
583
f24e9980 584 if (osdc->num_requests == 1) {
422d2cb8
YS
585 dout(" first request, scheduling timeout\n");
586 __schedule_osd_timeout(osdc);
f24e9980
SW
587 }
588 mutex_unlock(&osdc->request_mutex);
589}
590
591/*
592 * called under osdc->request_mutex
593 */
594static void __unregister_request(struct ceph_osd_client *osdc,
595 struct ceph_osd_request *req)
596{
597 dout("__unregister_request %p tid %lld\n", req, req->r_tid);
598 rb_erase(&req->r_node, &osdc->requests);
599 osdc->num_requests--;
600
0ba6478d
SW
601 if (req->r_osd) {
602 /* make sure the original request isn't in flight. */
603 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
604
605 list_del_init(&req->r_osd_item);
606 if (list_empty(&req->r_osd->o_requests))
f5a2041b 607 __move_osd_to_lru(osdc, req->r_osd);
0ba6478d
SW
608 req->r_osd = NULL;
609 }
f24e9980
SW
610
611 ceph_osdc_put_request(req);
612
422d2cb8
YS
613 list_del_init(&req->r_req_lru_item);
614 if (osdc->num_requests == 0) {
615 dout(" no requests, canceling timeout\n");
616 __cancel_osd_timeout(osdc);
f24e9980
SW
617 }
618}
619
620/*
621 * Cancel a previously queued request message
622 */
623static void __cancel_request(struct ceph_osd_request *req)
624{
6bc18876 625 if (req->r_sent && req->r_osd) {
f24e9980
SW
626 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
627 req->r_sent = 0;
628 }
422d2cb8 629 list_del_init(&req->r_req_lru_item);
f24e9980
SW
630}
631
632/*
633 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
634 * (as needed), and set the request r_osd appropriately. If there is
635 * no up osd, set r_osd to NULL.
636 *
637 * Return 0 if unchanged, 1 if changed, or negative on error.
638 *
639 * Caller should hold map_sem for read and request_mutex.
640 */
641static int __map_osds(struct ceph_osd_client *osdc,
642 struct ceph_osd_request *req)
643{
644 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
51042122 645 struct ceph_pg pgid;
d85b7056
SW
646 int acting[CEPH_PG_MAX_SIZE];
647 int o = -1, num = 0;
f24e9980 648 int err;
f24e9980
SW
649
650 dout("map_osds %p tid %lld\n", req, req->r_tid);
651 err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
652 &req->r_file_layout, osdc->osdmap);
653 if (err)
654 return err;
51042122 655 pgid = reqhead->layout.ol_pgid;
7740a42f
SW
656 req->r_pgid = pgid;
657
d85b7056
SW
658 err = ceph_calc_pg_acting(osdc->osdmap, pgid, acting);
659 if (err > 0) {
660 o = acting[0];
661 num = err;
662 }
f24e9980
SW
663
664 if ((req->r_osd && req->r_osd->o_osd == o &&
d85b7056
SW
665 req->r_sent >= req->r_osd->o_incarnation &&
666 req->r_num_pg_osds == num &&
667 memcmp(req->r_pg_osds, acting, sizeof(acting[0])*num) == 0) ||
f24e9980
SW
668 (req->r_osd == NULL && o == -1))
669 return 0; /* no change */
670
51042122
SW
671 dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n",
672 req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
f24e9980
SW
673 req->r_osd ? req->r_osd->o_osd : -1);
674
d85b7056
SW
675 /* record full pg acting set */
676 memcpy(req->r_pg_osds, acting, sizeof(acting[0]) * num);
677 req->r_num_pg_osds = num;
678
f24e9980
SW
679 if (req->r_osd) {
680 __cancel_request(req);
681 list_del_init(&req->r_osd_item);
f24e9980
SW
682 req->r_osd = NULL;
683 }
684
685 req->r_osd = __lookup_osd(osdc, o);
686 if (!req->r_osd && o >= 0) {
c99eb1c7
SW
687 err = -ENOMEM;
688 req->r_osd = create_osd(osdc);
689 if (!req->r_osd)
690 goto out;
f24e9980
SW
691
692 dout("map_osds osd %p is osd%d\n", req->r_osd, o);
693 req->r_osd->o_osd = o;
694 req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
695 __insert_osd(osdc, req->r_osd);
696
697 ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
698 }
699
f5a2041b
YS
700 if (req->r_osd) {
701 __remove_osd_from_lru(req->r_osd);
f24e9980 702 list_add(&req->r_osd_item, &req->r_osd->o_requests);
f5a2041b 703 }
d85b7056 704 err = 1; /* osd or pg changed */
f24e9980
SW
705
706out:
f24e9980
SW
707 return err;
708}
709
710/*
711 * caller should hold map_sem (for read) and request_mutex
712 */
713static int __send_request(struct ceph_osd_client *osdc,
714 struct ceph_osd_request *req)
715{
716 struct ceph_osd_request_head *reqhead;
717 int err;
718
719 err = __map_osds(osdc, req);
720 if (err < 0)
721 return err;
722 if (req->r_osd == NULL) {
723 dout("send_request %p no up osds in pg\n", req);
724 ceph_monc_request_next_osdmap(&osdc->client->monc);
725 return 0;
726 }
727
728 dout("send_request %p tid %llu to osd%d flags %d\n",
729 req, req->r_tid, req->r_osd->o_osd, req->r_flags);
730
731 reqhead = req->r_request->front.iov_base;
732 reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
733 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
734 reqhead->reassert_version = req->r_reassert_version;
735
3dd72fc0 736 req->r_stamp = jiffies;
07a27e22 737 list_move_tail(&req->r_req_lru_item, &osdc->req_lru);
f24e9980
SW
738
739 ceph_msg_get(req->r_request); /* send consumes a ref */
740 ceph_con_send(&req->r_osd->o_con, req->r_request);
741 req->r_sent = req->r_osd->o_incarnation;
742 return 0;
743}
744
745/*
746 * Timeout callback, called every N seconds when 1 or more osd
747 * requests has been active for more than N seconds. When this
748 * happens, we ping all OSDs with requests who have timed out to
749 * ensure any communications channel reset is detected. Reset the
750 * request timeouts another N seconds in the future as we go.
751 * Reschedule the timeout event another N seconds in future (unless
752 * there are no open requests).
753 */
754static void handle_timeout(struct work_struct *work)
755{
756 struct ceph_osd_client *osdc =
757 container_of(work, struct ceph_osd_client, timeout_work.work);
422d2cb8 758 struct ceph_osd_request *req, *last_req = NULL;
f24e9980 759 struct ceph_osd *osd;
6b805185 760 unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ;
422d2cb8
YS
761 unsigned long keepalive =
762 osdc->client->mount_args->osd_keepalive_timeout * HZ;
3dd72fc0 763 unsigned long last_stamp = 0;
f24e9980 764 struct rb_node *p;
422d2cb8 765 struct list_head slow_osds;
f24e9980
SW
766
767 dout("timeout\n");
768 down_read(&osdc->map_sem);
769
770 ceph_monc_request_next_osdmap(&osdc->client->monc);
771
772 mutex_lock(&osdc->request_mutex);
773 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
774 req = rb_entry(p, struct ceph_osd_request, r_node);
775
776 if (req->r_resend) {
777 int err;
778
779 dout("osdc resending prev failed %lld\n", req->r_tid);
780 err = __send_request(osdc, req);
781 if (err)
782 dout("osdc failed again on %lld\n", req->r_tid);
783 else
784 req->r_resend = false;
785 continue;
786 }
787 }
f24e9980 788
422d2cb8
YS
789 /*
790 * reset osds that appear to be _really_ unresponsive. this
791 * is a failsafe measure.. we really shouldn't be getting to
792 * this point if the system is working properly. the monitors
793 * should mark the osd as failed and we should find out about
794 * it from an updated osd map.
795 */
f26e681d 796 while (timeout && !list_empty(&osdc->req_lru)) {
422d2cb8
YS
797 req = list_entry(osdc->req_lru.next, struct ceph_osd_request,
798 r_req_lru_item);
799
3dd72fc0 800 if (time_before(jiffies, req->r_stamp + timeout))
422d2cb8
YS
801 break;
802
3dd72fc0 803 BUG_ON(req == last_req && req->r_stamp == last_stamp);
422d2cb8 804 last_req = req;
3dd72fc0 805 last_stamp = req->r_stamp;
422d2cb8
YS
806
807 osd = req->r_osd;
808 BUG_ON(!osd);
809 pr_warning(" tid %llu timed out on osd%d, will reset osd\n",
810 req->r_tid, osd->o_osd);
811 __kick_requests(osdc, osd);
812 }
813
814 /*
815 * ping osds that are a bit slow. this ensures that if there
816 * is a break in the TCP connection we will notice, and reopen
817 * a connection with that osd (from the fault callback).
818 */
819 INIT_LIST_HEAD(&slow_osds);
820 list_for_each_entry(req, &osdc->req_lru, r_req_lru_item) {
3dd72fc0 821 if (time_before(jiffies, req->r_stamp + keepalive))
422d2cb8
YS
822 break;
823
824 osd = req->r_osd;
825 BUG_ON(!osd);
826 dout(" tid %llu is slow, will send keepalive on osd%d\n",
f24e9980 827 req->r_tid, osd->o_osd);
422d2cb8
YS
828 list_move_tail(&osd->o_keepalive_item, &slow_osds);
829 }
830 while (!list_empty(&slow_osds)) {
831 osd = list_entry(slow_osds.next, struct ceph_osd,
832 o_keepalive_item);
833 list_del_init(&osd->o_keepalive_item);
f24e9980
SW
834 ceph_con_keepalive(&osd->o_con);
835 }
836
422d2cb8 837 __schedule_osd_timeout(osdc);
f24e9980
SW
838 mutex_unlock(&osdc->request_mutex);
839
840 up_read(&osdc->map_sem);
841}
842
f5a2041b
YS
843static void handle_osds_timeout(struct work_struct *work)
844{
845 struct ceph_osd_client *osdc =
846 container_of(work, struct ceph_osd_client,
847 osds_timeout_work.work);
848 unsigned long delay =
849 osdc->client->mount_args->osd_idle_ttl * HZ >> 2;
850
851 dout("osds timeout\n");
852 down_read(&osdc->map_sem);
853 remove_old_osds(osdc, 0);
854 up_read(&osdc->map_sem);
855
856 schedule_delayed_work(&osdc->osds_timeout_work,
857 round_jiffies_relative(delay));
858}
859
f24e9980
SW
860/*
861 * handle osd op reply. either call the callback if it is specified,
862 * or do the completion to wake up the waiting thread.
863 */
350b1c32
SW
864static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
865 struct ceph_connection *con)
f24e9980
SW
866{
867 struct ceph_osd_reply_head *rhead = msg->front.iov_base;
868 struct ceph_osd_request *req;
869 u64 tid;
870 int numops, object_len, flags;
0ceed5db 871 s32 result;
f24e9980 872
6df058c0 873 tid = le64_to_cpu(msg->hdr.tid);
f24e9980
SW
874 if (msg->front.iov_len < sizeof(*rhead))
875 goto bad;
f24e9980
SW
876 numops = le32_to_cpu(rhead->num_ops);
877 object_len = le32_to_cpu(rhead->object_len);
0ceed5db 878 result = le32_to_cpu(rhead->result);
f24e9980
SW
879 if (msg->front.iov_len != sizeof(*rhead) + object_len +
880 numops * sizeof(struct ceph_osd_op))
881 goto bad;
0ceed5db 882 dout("handle_reply %p tid %llu result %d\n", msg, tid, (int)result);
f24e9980
SW
883
884 /* lookup */
885 mutex_lock(&osdc->request_mutex);
886 req = __lookup_request(osdc, tid);
887 if (req == NULL) {
888 dout("handle_reply tid %llu dne\n", tid);
889 mutex_unlock(&osdc->request_mutex);
890 return;
891 }
892 ceph_osdc_get_request(req);
893 flags = le32_to_cpu(rhead->flags);
894
350b1c32 895 /*
0d59ab81 896 * if this connection filled our message, drop our reference now, to
350b1c32
SW
897 * avoid a (safe but slower) revoke later.
898 */
0d59ab81 899 if (req->r_con_filling_msg == con && req->r_reply == msg) {
c16e7869 900 dout(" dropping con_filling_msg ref %p\n", con);
0d59ab81 901 req->r_con_filling_msg = NULL;
350b1c32
SW
902 ceph_con_put(con);
903 }
904
f24e9980
SW
905 if (!req->r_got_reply) {
906 unsigned bytes;
907
908 req->r_result = le32_to_cpu(rhead->result);
909 bytes = le32_to_cpu(msg->hdr.data_len);
910 dout("handle_reply result %d bytes %d\n", req->r_result,
911 bytes);
912 if (req->r_result == 0)
913 req->r_result = bytes;
914
915 /* in case this is a write and we need to replay, */
916 req->r_reassert_version = rhead->reassert_version;
917
918 req->r_got_reply = 1;
919 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
920 dout("handle_reply tid %llu dup ack\n", tid);
34b43a56 921 mutex_unlock(&osdc->request_mutex);
f24e9980
SW
922 goto done;
923 }
924
925 dout("handle_reply tid %llu flags %d\n", tid, flags);
926
927 /* either this is a read, or we got the safe response */
0ceed5db
SW
928 if (result < 0 ||
929 (flags & CEPH_OSD_FLAG_ONDISK) ||
f24e9980
SW
930 ((flags & CEPH_OSD_FLAG_WRITE) == 0))
931 __unregister_request(osdc, req);
932
933 mutex_unlock(&osdc->request_mutex);
934
935 if (req->r_callback)
936 req->r_callback(req, msg);
937 else
03066f23 938 complete_all(&req->r_completion);
f24e9980
SW
939
940 if (flags & CEPH_OSD_FLAG_ONDISK) {
941 if (req->r_safe_callback)
942 req->r_safe_callback(req, msg);
03066f23 943 complete_all(&req->r_safe_completion); /* fsync waiter */
f24e9980
SW
944 }
945
946done:
947 ceph_osdc_put_request(req);
948 return;
949
950bad:
951 pr_err("corrupt osd_op_reply got %d %d expected %d\n",
952 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
953 (int)sizeof(*rhead));
9ec7cab1 954 ceph_msg_dump(msg);
f24e9980
SW
955}
956
957
422d2cb8 958static int __kick_requests(struct ceph_osd_client *osdc,
f24e9980
SW
959 struct ceph_osd *kickosd)
960{
961 struct ceph_osd_request *req;
962 struct rb_node *p, *n;
963 int needmap = 0;
964 int err;
965
966 dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
153a008b 967 if (kickosd) {
87b315a5
SW
968 err = __reset_osd(osdc, kickosd);
969 if (err == -EAGAIN)
970 return 1;
153a008b 971 } else {
f24e9980
SW
972 for (p = rb_first(&osdc->osds); p; p = n) {
973 struct ceph_osd *osd =
974 rb_entry(p, struct ceph_osd, o_node);
975
976 n = rb_next(p);
977 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
103e2d3a
SW
978 memcmp(&osd->o_con.peer_addr,
979 ceph_osd_addr(osdc->osdmap,
980 osd->o_osd),
981 sizeof(struct ceph_entity_addr)) != 0)
f5a2041b 982 __reset_osd(osdc, osd);
f24e9980
SW
983 }
984 }
985
986 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
987 req = rb_entry(p, struct ceph_osd_request, r_node);
988
989 if (req->r_resend) {
990 dout(" r_resend set on tid %llu\n", req->r_tid);
266673db 991 __cancel_request(req);
f24e9980
SW
992 goto kick;
993 }
266673db
SW
994 if (req->r_osd && kickosd == req->r_osd) {
995 __cancel_request(req);
f24e9980 996 goto kick;
266673db 997 }
f24e9980
SW
998
999 err = __map_osds(osdc, req);
1000 if (err == 0)
1001 continue; /* no change */
1002 if (err < 0) {
1003 /*
1004 * FIXME: really, we should set the request
1005 * error and fail if this isn't a 'nofail'
1006 * request, but that's a fair bit more
1007 * complicated to do. So retry!
1008 */
1009 dout(" setting r_resend on %llu\n", req->r_tid);
1010 req->r_resend = true;
1011 continue;
1012 }
1013 if (req->r_osd == NULL) {
1014 dout("tid %llu maps to no valid osd\n", req->r_tid);
1015 needmap++; /* request a newer map */
1016 continue;
1017 }
1018
1019kick:
c1ea8823 1020 dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
12eadc19 1021 req->r_osd ? req->r_osd->o_osd : -1);
f24e9980
SW
1022 req->r_flags |= CEPH_OSD_FLAG_RETRY;
1023 err = __send_request(osdc, req);
1024 if (err) {
1025 dout(" setting r_resend on %llu\n", req->r_tid);
1026 req->r_resend = true;
1027 }
1028 }
422d2cb8
YS
1029
1030 return needmap;
1031}
1032
1033/*
1034 * Resubmit osd requests whose osd or osd address has changed. Request
1035 * a new osd map if osds are down, or we are otherwise unable to determine
1036 * how to direct a request.
1037 *
1038 * Close connections to down osds.
1039 *
1040 * If @who is specified, resubmit requests for that specific osd.
1041 *
1042 * Caller should hold map_sem for read and request_mutex.
1043 */
1044static void kick_requests(struct ceph_osd_client *osdc,
1045 struct ceph_osd *kickosd)
1046{
1047 int needmap;
1048
1049 mutex_lock(&osdc->request_mutex);
1050 needmap = __kick_requests(osdc, kickosd);
f24e9980
SW
1051 mutex_unlock(&osdc->request_mutex);
1052
1053 if (needmap) {
1054 dout("%d requests for down osds, need new map\n", needmap);
1055 ceph_monc_request_next_osdmap(&osdc->client->monc);
1056 }
f24e9980 1057
422d2cb8 1058}
f24e9980
SW
1059/*
1060 * Process updated osd map.
1061 *
1062 * The message contains any number of incremental and full maps, normally
1063 * indicating some sort of topology change in the cluster. Kick requests
1064 * off to different OSDs as needed.
1065 */
1066void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
1067{
1068 void *p, *end, *next;
1069 u32 nr_maps, maplen;
1070 u32 epoch;
1071 struct ceph_osdmap *newmap = NULL, *oldmap;
1072 int err;
1073 struct ceph_fsid fsid;
1074
1075 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
1076 p = msg->front.iov_base;
1077 end = p + msg->front.iov_len;
1078
1079 /* verify fsid */
1080 ceph_decode_need(&p, end, sizeof(fsid), bad);
1081 ceph_decode_copy(&p, &fsid, sizeof(fsid));
0743304d
SW
1082 if (ceph_check_fsid(osdc->client, &fsid) < 0)
1083 return;
f24e9980
SW
1084
1085 down_write(&osdc->map_sem);
1086
1087 /* incremental maps */
1088 ceph_decode_32_safe(&p, end, nr_maps, bad);
1089 dout(" %d inc maps\n", nr_maps);
1090 while (nr_maps > 0) {
1091 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
c89136ea
SW
1092 epoch = ceph_decode_32(&p);
1093 maplen = ceph_decode_32(&p);
f24e9980
SW
1094 ceph_decode_need(&p, end, maplen, bad);
1095 next = p + maplen;
1096 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
1097 dout("applying incremental map %u len %d\n",
1098 epoch, maplen);
1099 newmap = osdmap_apply_incremental(&p, next,
1100 osdc->osdmap,
1101 osdc->client->msgr);
1102 if (IS_ERR(newmap)) {
1103 err = PTR_ERR(newmap);
1104 goto bad;
1105 }
30dc6381 1106 BUG_ON(!newmap);
f24e9980
SW
1107 if (newmap != osdc->osdmap) {
1108 ceph_osdmap_destroy(osdc->osdmap);
1109 osdc->osdmap = newmap;
1110 }
1111 } else {
1112 dout("ignoring incremental map %u len %d\n",
1113 epoch, maplen);
1114 }
1115 p = next;
1116 nr_maps--;
1117 }
1118 if (newmap)
1119 goto done;
1120
1121 /* full maps */
1122 ceph_decode_32_safe(&p, end, nr_maps, bad);
1123 dout(" %d full maps\n", nr_maps);
1124 while (nr_maps) {
1125 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
c89136ea
SW
1126 epoch = ceph_decode_32(&p);
1127 maplen = ceph_decode_32(&p);
f24e9980
SW
1128 ceph_decode_need(&p, end, maplen, bad);
1129 if (nr_maps > 1) {
1130 dout("skipping non-latest full map %u len %d\n",
1131 epoch, maplen);
1132 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
1133 dout("skipping full map %u len %d, "
1134 "older than our %u\n", epoch, maplen,
1135 osdc->osdmap->epoch);
1136 } else {
1137 dout("taking full map %u len %d\n", epoch, maplen);
1138 newmap = osdmap_decode(&p, p+maplen);
1139 if (IS_ERR(newmap)) {
1140 err = PTR_ERR(newmap);
1141 goto bad;
1142 }
30dc6381 1143 BUG_ON(!newmap);
f24e9980
SW
1144 oldmap = osdc->osdmap;
1145 osdc->osdmap = newmap;
1146 if (oldmap)
1147 ceph_osdmap_destroy(oldmap);
1148 }
1149 p += maplen;
1150 nr_maps--;
1151 }
1152
1153done:
1154 downgrade_write(&osdc->map_sem);
1155 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1156 if (newmap)
1157 kick_requests(osdc, NULL);
1158 up_read(&osdc->map_sem);
03066f23 1159 wake_up_all(&osdc->client->auth_wq);
f24e9980
SW
1160 return;
1161
1162bad:
1163 pr_err("osdc handle_map corrupt msg\n");
9ec7cab1 1164 ceph_msg_dump(msg);
f24e9980
SW
1165 up_write(&osdc->map_sem);
1166 return;
1167}
1168
f24e9980
SW
1169/*
1170 * Register request, send initial attempt.
1171 */
1172int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1173 struct ceph_osd_request *req,
1174 bool nofail)
1175{
c1ea8823 1176 int rc = 0;
f24e9980
SW
1177
1178 req->r_request->pages = req->r_pages;
1179 req->r_request->nr_pages = req->r_num_pages;
1180
1181 register_request(osdc, req);
1182
1183 down_read(&osdc->map_sem);
1184 mutex_lock(&osdc->request_mutex);
c1ea8823
SW
1185 /*
1186 * a racing kick_requests() may have sent the message for us
1187 * while we dropped request_mutex above, so only send now if
1188 * the request still han't been touched yet.
1189 */
1190 if (req->r_sent == 0) {
1191 rc = __send_request(osdc, req);
1192 if (rc) {
1193 if (nofail) {
1194 dout("osdc_start_request failed send, "
1195 " marking %lld\n", req->r_tid);
1196 req->r_resend = true;
1197 rc = 0;
1198 } else {
1199 __unregister_request(osdc, req);
1200 }
f24e9980
SW
1201 }
1202 }
1203 mutex_unlock(&osdc->request_mutex);
1204 up_read(&osdc->map_sem);
1205 return rc;
1206}
1207
1208/*
1209 * wait for a request to complete
1210 */
1211int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1212 struct ceph_osd_request *req)
1213{
1214 int rc;
1215
1216 rc = wait_for_completion_interruptible(&req->r_completion);
1217 if (rc < 0) {
1218 mutex_lock(&osdc->request_mutex);
1219 __cancel_request(req);
529cfcc4 1220 __unregister_request(osdc, req);
f24e9980 1221 mutex_unlock(&osdc->request_mutex);
529cfcc4 1222 dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
f24e9980
SW
1223 return rc;
1224 }
1225
1226 dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1227 return req->r_result;
1228}
1229
1230/*
1231 * sync - wait for all in-flight requests to flush. avoid starvation.
1232 */
1233void ceph_osdc_sync(struct ceph_osd_client *osdc)
1234{
1235 struct ceph_osd_request *req;
1236 u64 last_tid, next_tid = 0;
1237
1238 mutex_lock(&osdc->request_mutex);
1239 last_tid = osdc->last_tid;
1240 while (1) {
1241 req = __lookup_request_ge(osdc, next_tid);
1242 if (!req)
1243 break;
1244 if (req->r_tid > last_tid)
1245 break;
1246
1247 next_tid = req->r_tid + 1;
1248 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1249 continue;
1250
1251 ceph_osdc_get_request(req);
1252 mutex_unlock(&osdc->request_mutex);
1253 dout("sync waiting on tid %llu (last is %llu)\n",
1254 req->r_tid, last_tid);
1255 wait_for_completion(&req->r_safe_completion);
1256 mutex_lock(&osdc->request_mutex);
1257 ceph_osdc_put_request(req);
1258 }
1259 mutex_unlock(&osdc->request_mutex);
1260 dout("sync done (thru tid %llu)\n", last_tid);
1261}
1262
1263/*
1264 * init, shutdown
1265 */
1266int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1267{
1268 int err;
1269
1270 dout("init\n");
1271 osdc->client = client;
1272 osdc->osdmap = NULL;
1273 init_rwsem(&osdc->map_sem);
1274 init_completion(&osdc->map_waiters);
1275 osdc->last_requested_map = 0;
1276 mutex_init(&osdc->request_mutex);
f24e9980
SW
1277 osdc->last_tid = 0;
1278 osdc->osds = RB_ROOT;
f5a2041b 1279 INIT_LIST_HEAD(&osdc->osd_lru);
f24e9980 1280 osdc->requests = RB_ROOT;
422d2cb8 1281 INIT_LIST_HEAD(&osdc->req_lru);
f24e9980
SW
1282 osdc->num_requests = 0;
1283 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
f5a2041b
YS
1284 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1285
1286 schedule_delayed_work(&osdc->osds_timeout_work,
1287 round_jiffies_relative(osdc->client->mount_args->osd_idle_ttl * HZ));
f24e9980 1288
5f44f142 1289 err = -ENOMEM;
f24e9980
SW
1290 osdc->req_mempool = mempool_create_kmalloc_pool(10,
1291 sizeof(struct ceph_osd_request));
1292 if (!osdc->req_mempool)
5f44f142 1293 goto out;
f24e9980 1294
4f48280e
SW
1295 err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true,
1296 "osd_op");
f24e9980 1297 if (err < 0)
5f44f142 1298 goto out_mempool;
c16e7869 1299 err = ceph_msgpool_init(&osdc->msgpool_op_reply,
4f48280e
SW
1300 OSD_OPREPLY_FRONT_LEN, 10, true,
1301 "osd_op_reply");
c16e7869
SW
1302 if (err < 0)
1303 goto out_msgpool;
f24e9980 1304 return 0;
5f44f142 1305
c16e7869
SW
1306out_msgpool:
1307 ceph_msgpool_destroy(&osdc->msgpool_op);
5f44f142
SW
1308out_mempool:
1309 mempool_destroy(osdc->req_mempool);
1310out:
1311 return err;
f24e9980
SW
1312}
1313
1314void ceph_osdc_stop(struct ceph_osd_client *osdc)
1315{
1316 cancel_delayed_work_sync(&osdc->timeout_work);
f5a2041b 1317 cancel_delayed_work_sync(&osdc->osds_timeout_work);
f24e9980
SW
1318 if (osdc->osdmap) {
1319 ceph_osdmap_destroy(osdc->osdmap);
1320 osdc->osdmap = NULL;
1321 }
f5a2041b 1322 remove_old_osds(osdc, 1);
f24e9980
SW
1323 mempool_destroy(osdc->req_mempool);
1324 ceph_msgpool_destroy(&osdc->msgpool_op);
c16e7869 1325 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
f24e9980
SW
1326}
1327
1328/*
1329 * Read some contiguous pages. If we cross a stripe boundary, shorten
1330 * *plen. Return number of bytes read, or error.
1331 */
1332int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1333 struct ceph_vino vino, struct ceph_file_layout *layout,
1334 u64 off, u64 *plen,
1335 u32 truncate_seq, u64 truncate_size,
1336 struct page **pages, int num_pages)
1337{
1338 struct ceph_osd_request *req;
1339 int rc = 0;
1340
1341 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1342 vino.snap, off, *plen);
1343 req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1344 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1345 NULL, 0, truncate_seq, truncate_size, NULL,
1346 false, 1);
a79832f2
SW
1347 if (!req)
1348 return -ENOMEM;
f24e9980
SW
1349
1350 /* it may be a short read due to an object boundary */
1351 req->r_pages = pages;
f24e9980
SW
1352
1353 dout("readpages final extent is %llu~%llu (%d pages)\n",
1354 off, *plen, req->r_num_pages);
1355
1356 rc = ceph_osdc_start_request(osdc, req, false);
1357 if (!rc)
1358 rc = ceph_osdc_wait_request(osdc, req);
1359
1360 ceph_osdc_put_request(req);
1361 dout("readpages result %d\n", rc);
1362 return rc;
1363}
1364
1365/*
1366 * do a synchronous write on N pages
1367 */
1368int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1369 struct ceph_file_layout *layout,
1370 struct ceph_snap_context *snapc,
1371 u64 off, u64 len,
1372 u32 truncate_seq, u64 truncate_size,
1373 struct timespec *mtime,
1374 struct page **pages, int num_pages,
1375 int flags, int do_sync, bool nofail)
1376{
1377 struct ceph_osd_request *req;
1378 int rc = 0;
1379
1380 BUG_ON(vino.snap != CEPH_NOSNAP);
1381 req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1382 CEPH_OSD_OP_WRITE,
1383 flags | CEPH_OSD_FLAG_ONDISK |
1384 CEPH_OSD_FLAG_WRITE,
1385 snapc, do_sync,
1386 truncate_seq, truncate_size, mtime,
1387 nofail, 1);
a79832f2
SW
1388 if (!req)
1389 return -ENOMEM;
f24e9980
SW
1390
1391 /* it may be a short write due to an object boundary */
1392 req->r_pages = pages;
f24e9980
SW
1393 dout("writepages %llu~%llu (%d pages)\n", off, len,
1394 req->r_num_pages);
1395
1396 rc = ceph_osdc_start_request(osdc, req, nofail);
1397 if (!rc)
1398 rc = ceph_osdc_wait_request(osdc, req);
1399
1400 ceph_osdc_put_request(req);
1401 if (rc == 0)
1402 rc = len;
1403 dout("writepages result %d\n", rc);
1404 return rc;
1405}
1406
1407/*
1408 * handle incoming message
1409 */
1410static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1411{
1412 struct ceph_osd *osd = con->private;
32c895e7 1413 struct ceph_osd_client *osdc;
f24e9980
SW
1414 int type = le16_to_cpu(msg->hdr.type);
1415
1416 if (!osd)
4a32f93d 1417 goto out;
32c895e7 1418 osdc = osd->o_osdc;
f24e9980
SW
1419
1420 switch (type) {
1421 case CEPH_MSG_OSD_MAP:
1422 ceph_osdc_handle_map(osdc, msg);
1423 break;
1424 case CEPH_MSG_OSD_OPREPLY:
350b1c32 1425 handle_reply(osdc, msg, con);
f24e9980
SW
1426 break;
1427
1428 default:
1429 pr_err("received unknown message type %d %s\n", type,
1430 ceph_msg_type_name(type));
1431 }
4a32f93d 1432out:
f24e9980
SW
1433 ceph_msg_put(msg);
1434}
1435
5b3a4db3 1436/*
21b667f6
SW
1437 * lookup and return message for incoming reply. set up reply message
1438 * pages.
5b3a4db3
SW
1439 */
1440static struct ceph_msg *get_reply(struct ceph_connection *con,
2450418c
YS
1441 struct ceph_msg_header *hdr,
1442 int *skip)
f24e9980
SW
1443{
1444 struct ceph_osd *osd = con->private;
1445 struct ceph_osd_client *osdc = osd->o_osdc;
2450418c 1446 struct ceph_msg *m;
0547a9b3 1447 struct ceph_osd_request *req;
5b3a4db3
SW
1448 int front = le32_to_cpu(hdr->front_len);
1449 int data_len = le32_to_cpu(hdr->data_len);
0547a9b3 1450 u64 tid;
f24e9980 1451
0547a9b3
YS
1452 tid = le64_to_cpu(hdr->tid);
1453 mutex_lock(&osdc->request_mutex);
1454 req = __lookup_request(osdc, tid);
1455 if (!req) {
1456 *skip = 1;
1457 m = NULL;
c16e7869 1458 pr_info("get_reply unknown tid %llu from osd%d\n", tid,
5b3a4db3 1459 osd->o_osd);
0547a9b3
YS
1460 goto out;
1461 }
c16e7869
SW
1462
1463 if (req->r_con_filling_msg) {
1464 dout("get_reply revoking msg %p from old con %p\n",
1465 req->r_reply, req->r_con_filling_msg);
1466 ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
1467 ceph_con_put(req->r_con_filling_msg);
6f46cb29 1468 req->r_con_filling_msg = NULL;
0547a9b3
YS
1469 }
1470
c16e7869
SW
1471 if (front > req->r_reply->front.iov_len) {
1472 pr_warning("get_reply front %d > preallocated %d\n",
1473 front, (int)req->r_reply->front.iov_len);
34d23762 1474 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, GFP_NOFS);
a79832f2 1475 if (!m)
c16e7869
SW
1476 goto out;
1477 ceph_msg_put(req->r_reply);
1478 req->r_reply = m;
1479 }
1480 m = ceph_msg_get(req->r_reply);
1481
0547a9b3 1482 if (data_len > 0) {
21b667f6
SW
1483 unsigned data_off = le16_to_cpu(hdr->data_off);
1484 int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
1485
1486 if (unlikely(req->r_num_pages < want)) {
1487 pr_warning("tid %lld reply %d > expected %d pages\n",
1488 tid, want, m->nr_pages);
0547a9b3
YS
1489 *skip = 1;
1490 ceph_msg_put(m);
a79832f2 1491 m = NULL;
21b667f6 1492 goto out;
0547a9b3 1493 }
21b667f6
SW
1494 m->pages = req->r_pages;
1495 m->nr_pages = req->r_num_pages;
0547a9b3 1496 }
5b3a4db3 1497 *skip = 0;
c16e7869
SW
1498 req->r_con_filling_msg = ceph_con_get(con);
1499 dout("get_reply tid %lld %p\n", tid, m);
0547a9b3
YS
1500
1501out:
1502 mutex_unlock(&osdc->request_mutex);
2450418c 1503 return m;
5b3a4db3
SW
1504
1505}
1506
1507static struct ceph_msg *alloc_msg(struct ceph_connection *con,
1508 struct ceph_msg_header *hdr,
1509 int *skip)
1510{
1511 struct ceph_osd *osd = con->private;
1512 int type = le16_to_cpu(hdr->type);
1513 int front = le32_to_cpu(hdr->front_len);
1514
1515 switch (type) {
1516 case CEPH_MSG_OSD_MAP:
34d23762 1517 return ceph_msg_new(type, front, GFP_NOFS);
5b3a4db3
SW
1518 case CEPH_MSG_OSD_OPREPLY:
1519 return get_reply(con, hdr, skip);
1520 default:
1521 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
1522 osd->o_osd);
1523 *skip = 1;
1524 return NULL;
1525 }
f24e9980
SW
1526}
1527
1528/*
1529 * Wrappers to refcount containing ceph_osd struct
1530 */
1531static struct ceph_connection *get_osd_con(struct ceph_connection *con)
1532{
1533 struct ceph_osd *osd = con->private;
1534 if (get_osd(osd))
1535 return con;
1536 return NULL;
1537}
1538
1539static void put_osd_con(struct ceph_connection *con)
1540{
1541 struct ceph_osd *osd = con->private;
1542 put_osd(osd);
1543}
1544
4e7a5dcd
SW
1545/*
1546 * authentication
1547 */
1548static int get_authorizer(struct ceph_connection *con,
213c99ee
SW
1549 void **buf, int *len, int *proto,
1550 void **reply_buf, int *reply_len, int force_new)
4e7a5dcd
SW
1551{
1552 struct ceph_osd *o = con->private;
1553 struct ceph_osd_client *osdc = o->o_osdc;
1554 struct ceph_auth_client *ac = osdc->client->monc.auth;
1555 int ret = 0;
1556
1557 if (force_new && o->o_authorizer) {
1558 ac->ops->destroy_authorizer(ac, o->o_authorizer);
1559 o->o_authorizer = NULL;
1560 }
1561 if (o->o_authorizer == NULL) {
1562 ret = ac->ops->create_authorizer(
1563 ac, CEPH_ENTITY_TYPE_OSD,
1564 &o->o_authorizer,
1565 &o->o_authorizer_buf,
1566 &o->o_authorizer_buf_len,
1567 &o->o_authorizer_reply_buf,
1568 &o->o_authorizer_reply_buf_len);
1569 if (ret)
213c99ee 1570 return ret;
4e7a5dcd
SW
1571 }
1572
1573 *proto = ac->protocol;
1574 *buf = o->o_authorizer_buf;
1575 *len = o->o_authorizer_buf_len;
1576 *reply_buf = o->o_authorizer_reply_buf;
1577 *reply_len = o->o_authorizer_reply_buf_len;
1578 return 0;
1579}
1580
1581
1582static int verify_authorizer_reply(struct ceph_connection *con, int len)
1583{
1584 struct ceph_osd *o = con->private;
1585 struct ceph_osd_client *osdc = o->o_osdc;
1586 struct ceph_auth_client *ac = osdc->client->monc.auth;
1587
1588 return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len);
1589}
1590
9bd2e6f8
SW
1591static int invalidate_authorizer(struct ceph_connection *con)
1592{
1593 struct ceph_osd *o = con->private;
1594 struct ceph_osd_client *osdc = o->o_osdc;
1595 struct ceph_auth_client *ac = osdc->client->monc.auth;
1596
1597 if (ac->ops->invalidate_authorizer)
1598 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
1599
1600 return ceph_monc_validate_auth(&osdc->client->monc);
1601}
4e7a5dcd 1602
9e32789f 1603static const struct ceph_connection_operations osd_con_ops = {
f24e9980
SW
1604 .get = get_osd_con,
1605 .put = put_osd_con,
1606 .dispatch = dispatch,
4e7a5dcd
SW
1607 .get_authorizer = get_authorizer,
1608 .verify_authorizer_reply = verify_authorizer_reply,
9bd2e6f8 1609 .invalidate_authorizer = invalidate_authorizer,
f24e9980 1610 .alloc_msg = alloc_msg,
81b024e7 1611 .fault = osd_reset,
f24e9980 1612};