ceph: fix flush_dirty_caps race with caps migration
[linux-2.6-block.git] / fs / ceph / osd_client.c
CommitLineData
f24e9980
SW
1#include "ceph_debug.h"
2
3#include <linux/err.h>
4#include <linux/highmem.h>
5#include <linux/mm.h>
6#include <linux/pagemap.h>
7#include <linux/slab.h>
8#include <linux/uaccess.h>
9
10#include "super.h"
11#include "osd_client.h"
12#include "messenger.h"
13#include "decode.h"
4e7a5dcd 14#include "auth.h"
f24e9980 15
c16e7869
SW
16#define OSD_OP_FRONT_LEN 4096
17#define OSD_OPREPLY_FRONT_LEN 512
0d59ab81 18
f24e9980
SW
19const static struct ceph_connection_operations osd_con_ops;
20
21static void kick_requests(struct ceph_osd_client *osdc, struct ceph_osd *osd);
22
23/*
24 * Implement client access to distributed object storage cluster.
25 *
26 * All data objects are stored within a cluster/cloud of OSDs, or
27 * "object storage devices." (Note that Ceph OSDs have _nothing_ to
28 * do with the T10 OSD extensions to SCSI.) Ceph OSDs are simply
29 * remote daemons serving up and coordinating consistent and safe
30 * access to storage.
31 *
32 * Cluster membership and the mapping of data objects onto storage devices
33 * are described by the osd map.
34 *
35 * We keep track of pending OSD requests (read, write), resubmit
36 * requests to different OSDs when the cluster topology/data layout
37 * change, or retry the affected requests when the communications
38 * channel with an OSD is reset.
39 */
40
41/*
42 * calculate the mapping of a file extent onto an object, and fill out the
43 * request accordingly. shorten extent as necessary if it crosses an
44 * object boundary.
45 *
46 * fill osd op in request message.
47 */
48static void calc_layout(struct ceph_osd_client *osdc,
49 struct ceph_vino vino, struct ceph_file_layout *layout,
50 u64 off, u64 *plen,
51 struct ceph_osd_request *req)
52{
53 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
54 struct ceph_osd_op *op = (void *)(reqhead + 1);
55 u64 orig_len = *plen;
56 u64 objoff, objlen; /* extent in object */
57 u64 bno;
58
59 reqhead->snapid = cpu_to_le64(vino.snap);
60
61 /* object extent? */
62 ceph_calc_file_object_mapping(layout, off, plen, &bno,
63 &objoff, &objlen);
64 if (*plen < orig_len)
65 dout(" skipping last %llu, final file extent %llu~%llu\n",
66 orig_len - *plen, off, *plen);
67
68 sprintf(req->r_oid, "%llx.%08llx", vino.ino, bno);
69 req->r_oid_len = strlen(req->r_oid);
70
71 op->extent.offset = cpu_to_le64(objoff);
72 op->extent.length = cpu_to_le64(objlen);
73 req->r_num_pages = calc_pages_for(off, *plen);
74
75 dout("calc_layout %s (%d) %llu~%llu (%d pages)\n",
76 req->r_oid, req->r_oid_len, objoff, objlen, req->r_num_pages);
77}
78
f24e9980
SW
79/*
80 * requests
81 */
415e49a9 82void ceph_osdc_release_request(struct kref *kref)
f24e9980 83{
415e49a9
SW
84 struct ceph_osd_request *req = container_of(kref,
85 struct ceph_osd_request,
86 r_kref);
87
88 if (req->r_request)
89 ceph_msg_put(req->r_request);
90 if (req->r_reply)
91 ceph_msg_put(req->r_reply);
0d59ab81 92 if (req->r_con_filling_msg) {
350b1c32 93 dout("release_request revoking pages %p from con %p\n",
0d59ab81
YS
94 req->r_pages, req->r_con_filling_msg);
95 ceph_con_revoke_message(req->r_con_filling_msg,
96 req->r_reply);
97 ceph_con_put(req->r_con_filling_msg);
350b1c32 98 }
415e49a9
SW
99 if (req->r_own_pages)
100 ceph_release_page_vector(req->r_pages,
101 req->r_num_pages);
102 ceph_put_snap_context(req->r_snapc);
103 if (req->r_mempool)
104 mempool_free(req, req->r_osdc->req_mempool);
105 else
106 kfree(req);
f24e9980
SW
107}
108
109/*
110 * build new request AND message, calculate layout, and adjust file
111 * extent as needed.
112 *
113 * if the file was recently truncated, we include information about its
114 * old and new size so that the object can be updated appropriately. (we
115 * avoid synchronously deleting truncated objects because it's slow.)
116 *
117 * if @do_sync, include a 'startsync' command so that the osd will flush
118 * data quickly.
119 */
120struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
121 struct ceph_file_layout *layout,
122 struct ceph_vino vino,
123 u64 off, u64 *plen,
124 int opcode, int flags,
125 struct ceph_snap_context *snapc,
126 int do_sync,
127 u32 truncate_seq,
128 u64 truncate_size,
129 struct timespec *mtime,
130 bool use_mempool, int num_reply)
131{
132 struct ceph_osd_request *req;
133 struct ceph_msg *msg;
134 struct ceph_osd_request_head *head;
135 struct ceph_osd_op *op;
136 void *p;
0c948992 137 int num_op = 1 + do_sync;
f24e9980 138 size_t msg_size = sizeof(*head) + num_op*sizeof(*op);
c16e7869 139 int i;
f24e9980
SW
140
141 if (use_mempool) {
142 req = mempool_alloc(osdc->req_mempool, GFP_NOFS);
143 memset(req, 0, sizeof(*req));
144 } else {
145 req = kzalloc(sizeof(*req), GFP_NOFS);
146 }
147 if (req == NULL)
148 return ERR_PTR(-ENOMEM);
149
f24e9980
SW
150 req->r_osdc = osdc;
151 req->r_mempool = use_mempool;
415e49a9 152 kref_init(&req->r_kref);
f24e9980
SW
153 init_completion(&req->r_completion);
154 init_completion(&req->r_safe_completion);
155 INIT_LIST_HEAD(&req->r_unsafe_item);
156 req->r_flags = flags;
157
158 WARN_ON((flags & (CEPH_OSD_FLAG_READ|CEPH_OSD_FLAG_WRITE)) == 0);
159
c16e7869
SW
160 /* create reply message */
161 if (use_mempool)
162 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
163 else
164 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY,
165 OSD_OPREPLY_FRONT_LEN, 0, 0, NULL);
166 if (IS_ERR(msg)) {
167 ceph_osdc_put_request(req);
168 return ERR_PTR(PTR_ERR(msg));
169 }
170 req->r_reply = msg;
171
172 /* create request message; allow space for oid */
f24e9980
SW
173 msg_size += 40;
174 if (snapc)
175 msg_size += sizeof(u64) * snapc->num_snaps;
176 if (use_mempool)
8f3bc053 177 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
f24e9980
SW
178 else
179 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, 0, 0, NULL);
180 if (IS_ERR(msg)) {
f24e9980
SW
181 ceph_osdc_put_request(req);
182 return ERR_PTR(PTR_ERR(msg));
183 }
184 msg->hdr.type = cpu_to_le16(CEPH_MSG_OSD_OP);
185 memset(msg->front.iov_base, 0, msg->front.iov_len);
186 head = msg->front.iov_base;
187 op = (void *)(head + 1);
188 p = (void *)(op + num_op);
189
190 req->r_request = msg;
191 req->r_snapc = ceph_get_snap_context(snapc);
192
193 head->client_inc = cpu_to_le32(1); /* always, for now. */
194 head->flags = cpu_to_le32(flags);
195 if (flags & CEPH_OSD_FLAG_WRITE)
196 ceph_encode_timespec(&head->mtime, mtime);
197 head->num_ops = cpu_to_le16(num_op);
198 op->op = cpu_to_le16(opcode);
199
200 /* calculate max write size */
201 calc_layout(osdc, vino, layout, off, plen, req);
202 req->r_file_layout = *layout; /* keep a copy */
203
204 if (flags & CEPH_OSD_FLAG_WRITE) {
205 req->r_request->hdr.data_off = cpu_to_le16(off);
206 req->r_request->hdr.data_len = cpu_to_le32(*plen);
207 op->payload_len = cpu_to_le32(*plen);
208 }
0c948992
YS
209 op->extent.truncate_size = cpu_to_le64(truncate_size);
210 op->extent.truncate_seq = cpu_to_le32(truncate_seq);
f24e9980
SW
211
212 /* fill in oid */
213 head->object_len = cpu_to_le32(req->r_oid_len);
214 memcpy(p, req->r_oid, req->r_oid_len);
215 p += req->r_oid_len;
216
f24e9980
SW
217 if (do_sync) {
218 op++;
219 op->op = cpu_to_le16(CEPH_OSD_OP_STARTSYNC);
220 }
221 if (snapc) {
222 head->snap_seq = cpu_to_le64(snapc->seq);
223 head->num_snaps = cpu_to_le32(snapc->num_snaps);
224 for (i = 0; i < snapc->num_snaps; i++) {
225 put_unaligned_le64(snapc->snaps[i], p);
226 p += sizeof(u64);
227 }
228 }
229
230 BUG_ON(p > msg->front.iov_base + msg->front.iov_len);
6f863e71
SW
231 msg_size = p - msg->front.iov_base;
232 msg->front.iov_len = msg_size;
233 msg->hdr.front_len = cpu_to_le32(msg_size);
f24e9980
SW
234 return req;
235}
236
237/*
238 * We keep osd requests in an rbtree, sorted by ->r_tid.
239 */
240static void __insert_request(struct ceph_osd_client *osdc,
241 struct ceph_osd_request *new)
242{
243 struct rb_node **p = &osdc->requests.rb_node;
244 struct rb_node *parent = NULL;
245 struct ceph_osd_request *req = NULL;
246
247 while (*p) {
248 parent = *p;
249 req = rb_entry(parent, struct ceph_osd_request, r_node);
250 if (new->r_tid < req->r_tid)
251 p = &(*p)->rb_left;
252 else if (new->r_tid > req->r_tid)
253 p = &(*p)->rb_right;
254 else
255 BUG();
256 }
257
258 rb_link_node(&new->r_node, parent, p);
259 rb_insert_color(&new->r_node, &osdc->requests);
260}
261
262static struct ceph_osd_request *__lookup_request(struct ceph_osd_client *osdc,
263 u64 tid)
264{
265 struct ceph_osd_request *req;
266 struct rb_node *n = osdc->requests.rb_node;
267
268 while (n) {
269 req = rb_entry(n, struct ceph_osd_request, r_node);
270 if (tid < req->r_tid)
271 n = n->rb_left;
272 else if (tid > req->r_tid)
273 n = n->rb_right;
274 else
275 return req;
276 }
277 return NULL;
278}
279
280static struct ceph_osd_request *
281__lookup_request_ge(struct ceph_osd_client *osdc,
282 u64 tid)
283{
284 struct ceph_osd_request *req;
285 struct rb_node *n = osdc->requests.rb_node;
286
287 while (n) {
288 req = rb_entry(n, struct ceph_osd_request, r_node);
289 if (tid < req->r_tid) {
290 if (!n->rb_left)
291 return req;
292 n = n->rb_left;
293 } else if (tid > req->r_tid) {
294 n = n->rb_right;
295 } else {
296 return req;
297 }
298 }
299 return NULL;
300}
301
302
303/*
81b024e7 304 * If the osd connection drops, we need to resubmit all requests.
f24e9980
SW
305 */
306static void osd_reset(struct ceph_connection *con)
307{
308 struct ceph_osd *osd = con->private;
309 struct ceph_osd_client *osdc;
310
311 if (!osd)
312 return;
313 dout("osd_reset osd%d\n", osd->o_osd);
314 osdc = osd->o_osdc;
f24e9980
SW
315 down_read(&osdc->map_sem);
316 kick_requests(osdc, osd);
317 up_read(&osdc->map_sem);
318}
319
320/*
321 * Track open sessions with osds.
322 */
323static struct ceph_osd *create_osd(struct ceph_osd_client *osdc)
324{
325 struct ceph_osd *osd;
326
327 osd = kzalloc(sizeof(*osd), GFP_NOFS);
328 if (!osd)
329 return NULL;
330
331 atomic_set(&osd->o_ref, 1);
332 osd->o_osdc = osdc;
333 INIT_LIST_HEAD(&osd->o_requests);
f5a2041b 334 INIT_LIST_HEAD(&osd->o_osd_lru);
f24e9980
SW
335 osd->o_incarnation = 1;
336
337 ceph_con_init(osdc->client->msgr, &osd->o_con);
338 osd->o_con.private = osd;
339 osd->o_con.ops = &osd_con_ops;
340 osd->o_con.peer_name.type = CEPH_ENTITY_TYPE_OSD;
4e7a5dcd 341
f24e9980
SW
342 return osd;
343}
344
345static struct ceph_osd *get_osd(struct ceph_osd *osd)
346{
347 if (atomic_inc_not_zero(&osd->o_ref)) {
348 dout("get_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref)-1,
349 atomic_read(&osd->o_ref));
350 return osd;
351 } else {
352 dout("get_osd %p FAIL\n", osd);
353 return NULL;
354 }
355}
356
357static void put_osd(struct ceph_osd *osd)
358{
359 dout("put_osd %p %d -> %d\n", osd, atomic_read(&osd->o_ref),
360 atomic_read(&osd->o_ref) - 1);
42ce56e5 361 if (atomic_dec_and_test(&osd->o_ref))
f24e9980 362 kfree(osd);
f24e9980
SW
363}
364
365/*
366 * remove an osd from our map
367 */
f5a2041b 368static void __remove_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
f24e9980 369{
f5a2041b 370 dout("__remove_osd %p\n", osd);
f24e9980
SW
371 BUG_ON(!list_empty(&osd->o_requests));
372 rb_erase(&osd->o_node, &osdc->osds);
f5a2041b 373 list_del_init(&osd->o_osd_lru);
f24e9980
SW
374 ceph_con_close(&osd->o_con);
375 put_osd(osd);
376}
377
f5a2041b
YS
378static void __move_osd_to_lru(struct ceph_osd_client *osdc,
379 struct ceph_osd *osd)
380{
381 dout("__move_osd_to_lru %p\n", osd);
382 BUG_ON(!list_empty(&osd->o_osd_lru));
383 list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
384 osd->lru_ttl = jiffies + osdc->client->mount_args->osd_idle_ttl * HZ;
385}
386
387static void __remove_osd_from_lru(struct ceph_osd *osd)
388{
389 dout("__remove_osd_from_lru %p\n", osd);
390 if (!list_empty(&osd->o_osd_lru))
391 list_del_init(&osd->o_osd_lru);
392}
393
394static void remove_old_osds(struct ceph_osd_client *osdc, int remove_all)
395{
396 struct ceph_osd *osd, *nosd;
397
398 dout("__remove_old_osds %p\n", osdc);
399 mutex_lock(&osdc->request_mutex);
400 list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
401 if (!remove_all && time_before(jiffies, osd->lru_ttl))
402 break;
403 __remove_osd(osdc, osd);
404 }
405 mutex_unlock(&osdc->request_mutex);
406}
407
f24e9980
SW
408/*
409 * reset osd connect
410 */
f5a2041b 411static int __reset_osd(struct ceph_osd_client *osdc, struct ceph_osd *osd)
f24e9980
SW
412{
413 int ret = 0;
414
f5a2041b 415 dout("__reset_osd %p osd%d\n", osd, osd->o_osd);
f24e9980 416 if (list_empty(&osd->o_requests)) {
f5a2041b 417 __remove_osd(osdc, osd);
f24e9980
SW
418 } else {
419 ceph_con_close(&osd->o_con);
420 ceph_con_open(&osd->o_con, &osdc->osdmap->osd_addr[osd->o_osd]);
421 osd->o_incarnation++;
422 }
423 return ret;
424}
425
426static void __insert_osd(struct ceph_osd_client *osdc, struct ceph_osd *new)
427{
428 struct rb_node **p = &osdc->osds.rb_node;
429 struct rb_node *parent = NULL;
430 struct ceph_osd *osd = NULL;
431
432 while (*p) {
433 parent = *p;
434 osd = rb_entry(parent, struct ceph_osd, o_node);
435 if (new->o_osd < osd->o_osd)
436 p = &(*p)->rb_left;
437 else if (new->o_osd > osd->o_osd)
438 p = &(*p)->rb_right;
439 else
440 BUG();
441 }
442
443 rb_link_node(&new->o_node, parent, p);
444 rb_insert_color(&new->o_node, &osdc->osds);
445}
446
447static struct ceph_osd *__lookup_osd(struct ceph_osd_client *osdc, int o)
448{
449 struct ceph_osd *osd;
450 struct rb_node *n = osdc->osds.rb_node;
451
452 while (n) {
453 osd = rb_entry(n, struct ceph_osd, o_node);
454 if (o < osd->o_osd)
455 n = n->rb_left;
456 else if (o > osd->o_osd)
457 n = n->rb_right;
458 else
459 return osd;
460 }
461 return NULL;
462}
463
464
465/*
466 * Register request, assign tid. If this is the first request, set up
467 * the timeout event.
468 */
469static void register_request(struct ceph_osd_client *osdc,
470 struct ceph_osd_request *req)
471{
f24e9980
SW
472 mutex_lock(&osdc->request_mutex);
473 req->r_tid = ++osdc->last_tid;
6df058c0 474 req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
f24e9980
SW
475
476 dout("register_request %p tid %lld\n", req, req->r_tid);
477 __insert_request(osdc, req);
478 ceph_osdc_get_request(req);
479 osdc->num_requests++;
480
481 req->r_timeout_stamp =
6b805185 482 jiffies + osdc->client->mount_args->osd_timeout*HZ;
f24e9980
SW
483
484 if (osdc->num_requests == 1) {
485 osdc->timeout_tid = req->r_tid;
486 dout(" timeout on tid %llu at %lu\n", req->r_tid,
487 req->r_timeout_stamp);
488 schedule_delayed_work(&osdc->timeout_work,
489 round_jiffies_relative(req->r_timeout_stamp - jiffies));
490 }
491 mutex_unlock(&osdc->request_mutex);
492}
493
494/*
495 * called under osdc->request_mutex
496 */
497static void __unregister_request(struct ceph_osd_client *osdc,
498 struct ceph_osd_request *req)
499{
500 dout("__unregister_request %p tid %lld\n", req, req->r_tid);
501 rb_erase(&req->r_node, &osdc->requests);
502 osdc->num_requests--;
503
0ba6478d
SW
504 if (req->r_osd) {
505 /* make sure the original request isn't in flight. */
506 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
507
508 list_del_init(&req->r_osd_item);
509 if (list_empty(&req->r_osd->o_requests))
f5a2041b 510 __move_osd_to_lru(osdc, req->r_osd);
0ba6478d
SW
511 req->r_osd = NULL;
512 }
f24e9980
SW
513
514 ceph_osdc_put_request(req);
515
516 if (req->r_tid == osdc->timeout_tid) {
517 if (osdc->num_requests == 0) {
518 dout("no requests, canceling timeout\n");
519 osdc->timeout_tid = 0;
520 cancel_delayed_work(&osdc->timeout_work);
521 } else {
522 req = rb_entry(rb_first(&osdc->requests),
523 struct ceph_osd_request, r_node);
524 osdc->timeout_tid = req->r_tid;
525 dout("rescheduled timeout on tid %llu at %lu\n",
526 req->r_tid, req->r_timeout_stamp);
527 schedule_delayed_work(&osdc->timeout_work,
528 round_jiffies_relative(req->r_timeout_stamp -
529 jiffies));
530 }
531 }
532}
533
534/*
535 * Cancel a previously queued request message
536 */
537static void __cancel_request(struct ceph_osd_request *req)
538{
539 if (req->r_sent) {
540 ceph_con_revoke(&req->r_osd->o_con, req->r_request);
541 req->r_sent = 0;
542 }
543}
544
545/*
546 * Pick an osd (the first 'up' osd in the pg), allocate the osd struct
547 * (as needed), and set the request r_osd appropriately. If there is
548 * no up osd, set r_osd to NULL.
549 *
550 * Return 0 if unchanged, 1 if changed, or negative on error.
551 *
552 * Caller should hold map_sem for read and request_mutex.
553 */
554static int __map_osds(struct ceph_osd_client *osdc,
555 struct ceph_osd_request *req)
556{
557 struct ceph_osd_request_head *reqhead = req->r_request->front.iov_base;
51042122 558 struct ceph_pg pgid;
f24e9980
SW
559 int o = -1;
560 int err;
f24e9980
SW
561
562 dout("map_osds %p tid %lld\n", req, req->r_tid);
563 err = ceph_calc_object_layout(&reqhead->layout, req->r_oid,
564 &req->r_file_layout, osdc->osdmap);
565 if (err)
566 return err;
51042122 567 pgid = reqhead->layout.ol_pgid;
7740a42f
SW
568 req->r_pgid = pgid;
569
f24e9980
SW
570 o = ceph_calc_pg_primary(osdc->osdmap, pgid);
571
572 if ((req->r_osd && req->r_osd->o_osd == o &&
573 req->r_sent >= req->r_osd->o_incarnation) ||
574 (req->r_osd == NULL && o == -1))
575 return 0; /* no change */
576
51042122
SW
577 dout("map_osds tid %llu pgid %d.%x osd%d (was osd%d)\n",
578 req->r_tid, le32_to_cpu(pgid.pool), le16_to_cpu(pgid.ps), o,
f24e9980
SW
579 req->r_osd ? req->r_osd->o_osd : -1);
580
581 if (req->r_osd) {
582 __cancel_request(req);
583 list_del_init(&req->r_osd_item);
f24e9980
SW
584 req->r_osd = NULL;
585 }
586
587 req->r_osd = __lookup_osd(osdc, o);
588 if (!req->r_osd && o >= 0) {
c99eb1c7
SW
589 err = -ENOMEM;
590 req->r_osd = create_osd(osdc);
591 if (!req->r_osd)
592 goto out;
f24e9980
SW
593
594 dout("map_osds osd %p is osd%d\n", req->r_osd, o);
595 req->r_osd->o_osd = o;
596 req->r_osd->o_con.peer_name.num = cpu_to_le64(o);
597 __insert_osd(osdc, req->r_osd);
598
599 ceph_con_open(&req->r_osd->o_con, &osdc->osdmap->osd_addr[o]);
600 }
601
f5a2041b
YS
602 if (req->r_osd) {
603 __remove_osd_from_lru(req->r_osd);
f24e9980 604 list_add(&req->r_osd_item, &req->r_osd->o_requests);
f5a2041b 605 }
f24e9980
SW
606 err = 1; /* osd changed */
607
608out:
f24e9980
SW
609 return err;
610}
611
612/*
613 * caller should hold map_sem (for read) and request_mutex
614 */
615static int __send_request(struct ceph_osd_client *osdc,
616 struct ceph_osd_request *req)
617{
618 struct ceph_osd_request_head *reqhead;
619 int err;
620
621 err = __map_osds(osdc, req);
622 if (err < 0)
623 return err;
624 if (req->r_osd == NULL) {
625 dout("send_request %p no up osds in pg\n", req);
626 ceph_monc_request_next_osdmap(&osdc->client->monc);
627 return 0;
628 }
629
630 dout("send_request %p tid %llu to osd%d flags %d\n",
631 req, req->r_tid, req->r_osd->o_osd, req->r_flags);
632
633 reqhead = req->r_request->front.iov_base;
634 reqhead->osdmap_epoch = cpu_to_le32(osdc->osdmap->epoch);
635 reqhead->flags |= cpu_to_le32(req->r_flags); /* e.g., RETRY */
636 reqhead->reassert_version = req->r_reassert_version;
637
6b805185 638 req->r_timeout_stamp = jiffies+osdc->client->mount_args->osd_timeout*HZ;
f24e9980
SW
639
640 ceph_msg_get(req->r_request); /* send consumes a ref */
641 ceph_con_send(&req->r_osd->o_con, req->r_request);
642 req->r_sent = req->r_osd->o_incarnation;
643 return 0;
644}
645
646/*
647 * Timeout callback, called every N seconds when 1 or more osd
648 * requests has been active for more than N seconds. When this
649 * happens, we ping all OSDs with requests who have timed out to
650 * ensure any communications channel reset is detected. Reset the
651 * request timeouts another N seconds in the future as we go.
652 * Reschedule the timeout event another N seconds in future (unless
653 * there are no open requests).
654 */
655static void handle_timeout(struct work_struct *work)
656{
657 struct ceph_osd_client *osdc =
658 container_of(work, struct ceph_osd_client, timeout_work.work);
659 struct ceph_osd_request *req;
660 struct ceph_osd *osd;
6b805185 661 unsigned long timeout = osdc->client->mount_args->osd_timeout * HZ;
f24e9980
SW
662 unsigned long next_timeout = timeout + jiffies;
663 struct rb_node *p;
664
665 dout("timeout\n");
666 down_read(&osdc->map_sem);
667
668 ceph_monc_request_next_osdmap(&osdc->client->monc);
669
670 mutex_lock(&osdc->request_mutex);
671 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
672 req = rb_entry(p, struct ceph_osd_request, r_node);
673
674 if (req->r_resend) {
675 int err;
676
677 dout("osdc resending prev failed %lld\n", req->r_tid);
678 err = __send_request(osdc, req);
679 if (err)
680 dout("osdc failed again on %lld\n", req->r_tid);
681 else
682 req->r_resend = false;
683 continue;
684 }
685 }
686 for (p = rb_first(&osdc->osds); p; p = rb_next(p)) {
687 osd = rb_entry(p, struct ceph_osd, o_node);
688 if (list_empty(&osd->o_requests))
689 continue;
690 req = list_first_entry(&osd->o_requests,
691 struct ceph_osd_request, r_osd_item);
692 if (time_before(jiffies, req->r_timeout_stamp))
693 continue;
694
695 dout(" tid %llu (at least) timed out on osd%d\n",
696 req->r_tid, osd->o_osd);
697 req->r_timeout_stamp = next_timeout;
698 ceph_con_keepalive(&osd->o_con);
699 }
700
701 if (osdc->timeout_tid)
702 schedule_delayed_work(&osdc->timeout_work,
703 round_jiffies_relative(timeout));
704
705 mutex_unlock(&osdc->request_mutex);
706
707 up_read(&osdc->map_sem);
708}
709
f5a2041b
YS
710static void handle_osds_timeout(struct work_struct *work)
711{
712 struct ceph_osd_client *osdc =
713 container_of(work, struct ceph_osd_client,
714 osds_timeout_work.work);
715 unsigned long delay =
716 osdc->client->mount_args->osd_idle_ttl * HZ >> 2;
717
718 dout("osds timeout\n");
719 down_read(&osdc->map_sem);
720 remove_old_osds(osdc, 0);
721 up_read(&osdc->map_sem);
722
723 schedule_delayed_work(&osdc->osds_timeout_work,
724 round_jiffies_relative(delay));
725}
726
f24e9980
SW
727/*
728 * handle osd op reply. either call the callback if it is specified,
729 * or do the completion to wake up the waiting thread.
730 */
350b1c32
SW
731static void handle_reply(struct ceph_osd_client *osdc, struct ceph_msg *msg,
732 struct ceph_connection *con)
f24e9980
SW
733{
734 struct ceph_osd_reply_head *rhead = msg->front.iov_base;
735 struct ceph_osd_request *req;
736 u64 tid;
737 int numops, object_len, flags;
738
6df058c0 739 tid = le64_to_cpu(msg->hdr.tid);
f24e9980
SW
740 if (msg->front.iov_len < sizeof(*rhead))
741 goto bad;
f24e9980
SW
742 numops = le32_to_cpu(rhead->num_ops);
743 object_len = le32_to_cpu(rhead->object_len);
744 if (msg->front.iov_len != sizeof(*rhead) + object_len +
745 numops * sizeof(struct ceph_osd_op))
746 goto bad;
747 dout("handle_reply %p tid %llu\n", msg, tid);
748
749 /* lookup */
750 mutex_lock(&osdc->request_mutex);
751 req = __lookup_request(osdc, tid);
752 if (req == NULL) {
753 dout("handle_reply tid %llu dne\n", tid);
754 mutex_unlock(&osdc->request_mutex);
755 return;
756 }
757 ceph_osdc_get_request(req);
758 flags = le32_to_cpu(rhead->flags);
759
350b1c32 760 /*
0d59ab81 761 * if this connection filled our message, drop our reference now, to
350b1c32
SW
762 * avoid a (safe but slower) revoke later.
763 */
0d59ab81 764 if (req->r_con_filling_msg == con && req->r_reply == msg) {
c16e7869 765 dout(" dropping con_filling_msg ref %p\n", con);
0d59ab81 766 req->r_con_filling_msg = NULL;
350b1c32
SW
767 ceph_con_put(con);
768 }
769
f24e9980
SW
770 if (!req->r_got_reply) {
771 unsigned bytes;
772
773 req->r_result = le32_to_cpu(rhead->result);
774 bytes = le32_to_cpu(msg->hdr.data_len);
775 dout("handle_reply result %d bytes %d\n", req->r_result,
776 bytes);
777 if (req->r_result == 0)
778 req->r_result = bytes;
779
780 /* in case this is a write and we need to replay, */
781 req->r_reassert_version = rhead->reassert_version;
782
783 req->r_got_reply = 1;
784 } else if ((flags & CEPH_OSD_FLAG_ONDISK) == 0) {
785 dout("handle_reply tid %llu dup ack\n", tid);
34b43a56 786 mutex_unlock(&osdc->request_mutex);
f24e9980
SW
787 goto done;
788 }
789
790 dout("handle_reply tid %llu flags %d\n", tid, flags);
791
792 /* either this is a read, or we got the safe response */
793 if ((flags & CEPH_OSD_FLAG_ONDISK) ||
794 ((flags & CEPH_OSD_FLAG_WRITE) == 0))
795 __unregister_request(osdc, req);
796
797 mutex_unlock(&osdc->request_mutex);
798
799 if (req->r_callback)
800 req->r_callback(req, msg);
801 else
802 complete(&req->r_completion);
803
804 if (flags & CEPH_OSD_FLAG_ONDISK) {
805 if (req->r_safe_callback)
806 req->r_safe_callback(req, msg);
807 complete(&req->r_safe_completion); /* fsync waiter */
808 }
809
810done:
811 ceph_osdc_put_request(req);
812 return;
813
814bad:
815 pr_err("corrupt osd_op_reply got %d %d expected %d\n",
816 (int)msg->front.iov_len, le32_to_cpu(msg->hdr.front_len),
817 (int)sizeof(*rhead));
9ec7cab1 818 ceph_msg_dump(msg);
f24e9980
SW
819}
820
821
822/*
823 * Resubmit osd requests whose osd or osd address has changed. Request
824 * a new osd map if osds are down, or we are otherwise unable to determine
825 * how to direct a request.
826 *
827 * Close connections to down osds.
828 *
829 * If @who is specified, resubmit requests for that specific osd.
830 *
831 * Caller should hold map_sem for read and request_mutex.
832 */
833static void kick_requests(struct ceph_osd_client *osdc,
834 struct ceph_osd *kickosd)
835{
836 struct ceph_osd_request *req;
837 struct rb_node *p, *n;
838 int needmap = 0;
839 int err;
840
841 dout("kick_requests osd%d\n", kickosd ? kickosd->o_osd : -1);
842 mutex_lock(&osdc->request_mutex);
153a008b
SW
843 if (kickosd) {
844 __reset_osd(osdc, kickosd);
845 } else {
f24e9980
SW
846 for (p = rb_first(&osdc->osds); p; p = n) {
847 struct ceph_osd *osd =
848 rb_entry(p, struct ceph_osd, o_node);
849
850 n = rb_next(p);
851 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
103e2d3a
SW
852 memcmp(&osd->o_con.peer_addr,
853 ceph_osd_addr(osdc->osdmap,
854 osd->o_osd),
855 sizeof(struct ceph_entity_addr)) != 0)
f5a2041b 856 __reset_osd(osdc, osd);
f24e9980
SW
857 }
858 }
859
860 for (p = rb_first(&osdc->requests); p; p = rb_next(p)) {
861 req = rb_entry(p, struct ceph_osd_request, r_node);
862
863 if (req->r_resend) {
864 dout(" r_resend set on tid %llu\n", req->r_tid);
266673db 865 __cancel_request(req);
f24e9980
SW
866 goto kick;
867 }
266673db
SW
868 if (req->r_osd && kickosd == req->r_osd) {
869 __cancel_request(req);
f24e9980 870 goto kick;
266673db 871 }
f24e9980
SW
872
873 err = __map_osds(osdc, req);
874 if (err == 0)
875 continue; /* no change */
876 if (err < 0) {
877 /*
878 * FIXME: really, we should set the request
879 * error and fail if this isn't a 'nofail'
880 * request, but that's a fair bit more
881 * complicated to do. So retry!
882 */
883 dout(" setting r_resend on %llu\n", req->r_tid);
884 req->r_resend = true;
885 continue;
886 }
887 if (req->r_osd == NULL) {
888 dout("tid %llu maps to no valid osd\n", req->r_tid);
889 needmap++; /* request a newer map */
890 continue;
891 }
892
893kick:
c1ea8823
SW
894 dout("kicking %p tid %llu osd%d\n", req, req->r_tid,
895 req->r_osd->o_osd);
f24e9980
SW
896 req->r_flags |= CEPH_OSD_FLAG_RETRY;
897 err = __send_request(osdc, req);
898 if (err) {
899 dout(" setting r_resend on %llu\n", req->r_tid);
900 req->r_resend = true;
901 }
902 }
903 mutex_unlock(&osdc->request_mutex);
904
905 if (needmap) {
906 dout("%d requests for down osds, need new map\n", needmap);
907 ceph_monc_request_next_osdmap(&osdc->client->monc);
908 }
909}
910
911/*
912 * Process updated osd map.
913 *
914 * The message contains any number of incremental and full maps, normally
915 * indicating some sort of topology change in the cluster. Kick requests
916 * off to different OSDs as needed.
917 */
918void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
919{
920 void *p, *end, *next;
921 u32 nr_maps, maplen;
922 u32 epoch;
923 struct ceph_osdmap *newmap = NULL, *oldmap;
924 int err;
925 struct ceph_fsid fsid;
926
927 dout("handle_map have %u\n", osdc->osdmap ? osdc->osdmap->epoch : 0);
928 p = msg->front.iov_base;
929 end = p + msg->front.iov_len;
930
931 /* verify fsid */
932 ceph_decode_need(&p, end, sizeof(fsid), bad);
933 ceph_decode_copy(&p, &fsid, sizeof(fsid));
0743304d
SW
934 if (ceph_check_fsid(osdc->client, &fsid) < 0)
935 return;
f24e9980
SW
936
937 down_write(&osdc->map_sem);
938
939 /* incremental maps */
940 ceph_decode_32_safe(&p, end, nr_maps, bad);
941 dout(" %d inc maps\n", nr_maps);
942 while (nr_maps > 0) {
943 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
c89136ea
SW
944 epoch = ceph_decode_32(&p);
945 maplen = ceph_decode_32(&p);
f24e9980
SW
946 ceph_decode_need(&p, end, maplen, bad);
947 next = p + maplen;
948 if (osdc->osdmap && osdc->osdmap->epoch+1 == epoch) {
949 dout("applying incremental map %u len %d\n",
950 epoch, maplen);
951 newmap = osdmap_apply_incremental(&p, next,
952 osdc->osdmap,
953 osdc->client->msgr);
954 if (IS_ERR(newmap)) {
955 err = PTR_ERR(newmap);
956 goto bad;
957 }
30dc6381 958 BUG_ON(!newmap);
f24e9980
SW
959 if (newmap != osdc->osdmap) {
960 ceph_osdmap_destroy(osdc->osdmap);
961 osdc->osdmap = newmap;
962 }
963 } else {
964 dout("ignoring incremental map %u len %d\n",
965 epoch, maplen);
966 }
967 p = next;
968 nr_maps--;
969 }
970 if (newmap)
971 goto done;
972
973 /* full maps */
974 ceph_decode_32_safe(&p, end, nr_maps, bad);
975 dout(" %d full maps\n", nr_maps);
976 while (nr_maps) {
977 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
c89136ea
SW
978 epoch = ceph_decode_32(&p);
979 maplen = ceph_decode_32(&p);
f24e9980
SW
980 ceph_decode_need(&p, end, maplen, bad);
981 if (nr_maps > 1) {
982 dout("skipping non-latest full map %u len %d\n",
983 epoch, maplen);
984 } else if (osdc->osdmap && osdc->osdmap->epoch >= epoch) {
985 dout("skipping full map %u len %d, "
986 "older than our %u\n", epoch, maplen,
987 osdc->osdmap->epoch);
988 } else {
989 dout("taking full map %u len %d\n", epoch, maplen);
990 newmap = osdmap_decode(&p, p+maplen);
991 if (IS_ERR(newmap)) {
992 err = PTR_ERR(newmap);
993 goto bad;
994 }
30dc6381 995 BUG_ON(!newmap);
f24e9980
SW
996 oldmap = osdc->osdmap;
997 osdc->osdmap = newmap;
998 if (oldmap)
999 ceph_osdmap_destroy(oldmap);
1000 }
1001 p += maplen;
1002 nr_maps--;
1003 }
1004
1005done:
1006 downgrade_write(&osdc->map_sem);
1007 ceph_monc_got_osdmap(&osdc->client->monc, osdc->osdmap->epoch);
1008 if (newmap)
1009 kick_requests(osdc, NULL);
1010 up_read(&osdc->map_sem);
1011 return;
1012
1013bad:
1014 pr_err("osdc handle_map corrupt msg\n");
9ec7cab1 1015 ceph_msg_dump(msg);
f24e9980
SW
1016 up_write(&osdc->map_sem);
1017 return;
1018}
1019
1020
1021/*
1022 * A read request prepares specific pages that data is to be read into.
1023 * When a message is being read off the wire, we call prepare_pages to
1024 * find those pages.
1025 * 0 = success, -1 failure.
1026 */
0d59ab81 1027static int __prepare_pages(struct ceph_connection *con,
0547a9b3
YS
1028 struct ceph_msg_header *hdr,
1029 struct ceph_osd_request *req,
1030 u64 tid,
1031 struct ceph_msg *m)
f24e9980
SW
1032{
1033 struct ceph_osd *osd = con->private;
1034 struct ceph_osd_client *osdc;
f24e9980 1035 int ret = -1;
0547a9b3
YS
1036 int data_len = le32_to_cpu(hdr->data_len);
1037 unsigned data_off = le16_to_cpu(hdr->data_off);
1038
1039 int want = calc_pages_for(data_off & ~PAGE_MASK, data_len);
f24e9980
SW
1040
1041 if (!osd)
1042 return -1;
0547a9b3 1043
f24e9980
SW
1044 osdc = osd->o_osdc;
1045
0d59ab81 1046 dout("__prepare_pages on msg %p tid %llu, has %d pages, want %d\n", m,
f24e9980 1047 tid, req->r_num_pages, want);
350b1c32
SW
1048 if (unlikely(req->r_num_pages < want))
1049 goto out;
350b1c32
SW
1050 m->pages = req->r_pages;
1051 m->nr_pages = req->r_num_pages;
1052 ret = 0; /* success */
f24e9980 1053out:
0547a9b3
YS
1054 BUG_ON(ret < 0 || m->nr_pages < want);
1055
f24e9980
SW
1056 return ret;
1057}
1058
1059/*
1060 * Register request, send initial attempt.
1061 */
1062int ceph_osdc_start_request(struct ceph_osd_client *osdc,
1063 struct ceph_osd_request *req,
1064 bool nofail)
1065{
c1ea8823 1066 int rc = 0;
f24e9980
SW
1067
1068 req->r_request->pages = req->r_pages;
1069 req->r_request->nr_pages = req->r_num_pages;
1070
1071 register_request(osdc, req);
1072
1073 down_read(&osdc->map_sem);
1074 mutex_lock(&osdc->request_mutex);
c1ea8823
SW
1075 /*
1076 * a racing kick_requests() may have sent the message for us
1077 * while we dropped request_mutex above, so only send now if
1078 * the request still han't been touched yet.
1079 */
1080 if (req->r_sent == 0) {
1081 rc = __send_request(osdc, req);
1082 if (rc) {
1083 if (nofail) {
1084 dout("osdc_start_request failed send, "
1085 " marking %lld\n", req->r_tid);
1086 req->r_resend = true;
1087 rc = 0;
1088 } else {
1089 __unregister_request(osdc, req);
1090 }
f24e9980
SW
1091 }
1092 }
1093 mutex_unlock(&osdc->request_mutex);
1094 up_read(&osdc->map_sem);
1095 return rc;
1096}
1097
1098/*
1099 * wait for a request to complete
1100 */
1101int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
1102 struct ceph_osd_request *req)
1103{
1104 int rc;
1105
1106 rc = wait_for_completion_interruptible(&req->r_completion);
1107 if (rc < 0) {
1108 mutex_lock(&osdc->request_mutex);
1109 __cancel_request(req);
529cfcc4 1110 __unregister_request(osdc, req);
f24e9980 1111 mutex_unlock(&osdc->request_mutex);
529cfcc4 1112 dout("wait_request tid %llu canceled/timed out\n", req->r_tid);
f24e9980
SW
1113 return rc;
1114 }
1115
1116 dout("wait_request tid %llu result %d\n", req->r_tid, req->r_result);
1117 return req->r_result;
1118}
1119
1120/*
1121 * sync - wait for all in-flight requests to flush. avoid starvation.
1122 */
1123void ceph_osdc_sync(struct ceph_osd_client *osdc)
1124{
1125 struct ceph_osd_request *req;
1126 u64 last_tid, next_tid = 0;
1127
1128 mutex_lock(&osdc->request_mutex);
1129 last_tid = osdc->last_tid;
1130 while (1) {
1131 req = __lookup_request_ge(osdc, next_tid);
1132 if (!req)
1133 break;
1134 if (req->r_tid > last_tid)
1135 break;
1136
1137 next_tid = req->r_tid + 1;
1138 if ((req->r_flags & CEPH_OSD_FLAG_WRITE) == 0)
1139 continue;
1140
1141 ceph_osdc_get_request(req);
1142 mutex_unlock(&osdc->request_mutex);
1143 dout("sync waiting on tid %llu (last is %llu)\n",
1144 req->r_tid, last_tid);
1145 wait_for_completion(&req->r_safe_completion);
1146 mutex_lock(&osdc->request_mutex);
1147 ceph_osdc_put_request(req);
1148 }
1149 mutex_unlock(&osdc->request_mutex);
1150 dout("sync done (thru tid %llu)\n", last_tid);
1151}
1152
1153/*
1154 * init, shutdown
1155 */
1156int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
1157{
1158 int err;
1159
1160 dout("init\n");
1161 osdc->client = client;
1162 osdc->osdmap = NULL;
1163 init_rwsem(&osdc->map_sem);
1164 init_completion(&osdc->map_waiters);
1165 osdc->last_requested_map = 0;
1166 mutex_init(&osdc->request_mutex);
1167 osdc->timeout_tid = 0;
1168 osdc->last_tid = 0;
1169 osdc->osds = RB_ROOT;
f5a2041b 1170 INIT_LIST_HEAD(&osdc->osd_lru);
f24e9980
SW
1171 osdc->requests = RB_ROOT;
1172 osdc->num_requests = 0;
1173 INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
f5a2041b
YS
1174 INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
1175
1176 schedule_delayed_work(&osdc->osds_timeout_work,
1177 round_jiffies_relative(osdc->client->mount_args->osd_idle_ttl * HZ));
f24e9980 1178
5f44f142 1179 err = -ENOMEM;
f24e9980
SW
1180 osdc->req_mempool = mempool_create_kmalloc_pool(10,
1181 sizeof(struct ceph_osd_request));
1182 if (!osdc->req_mempool)
5f44f142 1183 goto out;
f24e9980 1184
c16e7869 1185 err = ceph_msgpool_init(&osdc->msgpool_op, OSD_OP_FRONT_LEN, 10, true);
f24e9980 1186 if (err < 0)
5f44f142 1187 goto out_mempool;
c16e7869
SW
1188 err = ceph_msgpool_init(&osdc->msgpool_op_reply,
1189 OSD_OPREPLY_FRONT_LEN, 10, true);
1190 if (err < 0)
1191 goto out_msgpool;
f24e9980 1192 return 0;
5f44f142 1193
c16e7869
SW
1194out_msgpool:
1195 ceph_msgpool_destroy(&osdc->msgpool_op);
5f44f142
SW
1196out_mempool:
1197 mempool_destroy(osdc->req_mempool);
1198out:
1199 return err;
f24e9980
SW
1200}
1201
1202void ceph_osdc_stop(struct ceph_osd_client *osdc)
1203{
1204 cancel_delayed_work_sync(&osdc->timeout_work);
f5a2041b 1205 cancel_delayed_work_sync(&osdc->osds_timeout_work);
f24e9980
SW
1206 if (osdc->osdmap) {
1207 ceph_osdmap_destroy(osdc->osdmap);
1208 osdc->osdmap = NULL;
1209 }
f5a2041b 1210 remove_old_osds(osdc, 1);
f24e9980
SW
1211 mempool_destroy(osdc->req_mempool);
1212 ceph_msgpool_destroy(&osdc->msgpool_op);
c16e7869 1213 ceph_msgpool_destroy(&osdc->msgpool_op_reply);
f24e9980
SW
1214}
1215
1216/*
1217 * Read some contiguous pages. If we cross a stripe boundary, shorten
1218 * *plen. Return number of bytes read, or error.
1219 */
1220int ceph_osdc_readpages(struct ceph_osd_client *osdc,
1221 struct ceph_vino vino, struct ceph_file_layout *layout,
1222 u64 off, u64 *plen,
1223 u32 truncate_seq, u64 truncate_size,
1224 struct page **pages, int num_pages)
1225{
1226 struct ceph_osd_request *req;
1227 int rc = 0;
1228
1229 dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
1230 vino.snap, off, *plen);
1231 req = ceph_osdc_new_request(osdc, layout, vino, off, plen,
1232 CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
1233 NULL, 0, truncate_seq, truncate_size, NULL,
1234 false, 1);
1235 if (IS_ERR(req))
1236 return PTR_ERR(req);
1237
1238 /* it may be a short read due to an object boundary */
1239 req->r_pages = pages;
1240 num_pages = calc_pages_for(off, *plen);
1241 req->r_num_pages = num_pages;
1242
1243 dout("readpages final extent is %llu~%llu (%d pages)\n",
1244 off, *plen, req->r_num_pages);
1245
1246 rc = ceph_osdc_start_request(osdc, req, false);
1247 if (!rc)
1248 rc = ceph_osdc_wait_request(osdc, req);
1249
1250 ceph_osdc_put_request(req);
1251 dout("readpages result %d\n", rc);
1252 return rc;
1253}
1254
1255/*
1256 * do a synchronous write on N pages
1257 */
1258int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
1259 struct ceph_file_layout *layout,
1260 struct ceph_snap_context *snapc,
1261 u64 off, u64 len,
1262 u32 truncate_seq, u64 truncate_size,
1263 struct timespec *mtime,
1264 struct page **pages, int num_pages,
1265 int flags, int do_sync, bool nofail)
1266{
1267 struct ceph_osd_request *req;
1268 int rc = 0;
1269
1270 BUG_ON(vino.snap != CEPH_NOSNAP);
1271 req = ceph_osdc_new_request(osdc, layout, vino, off, &len,
1272 CEPH_OSD_OP_WRITE,
1273 flags | CEPH_OSD_FLAG_ONDISK |
1274 CEPH_OSD_FLAG_WRITE,
1275 snapc, do_sync,
1276 truncate_seq, truncate_size, mtime,
1277 nofail, 1);
1278 if (IS_ERR(req))
1279 return PTR_ERR(req);
1280
1281 /* it may be a short write due to an object boundary */
1282 req->r_pages = pages;
1283 req->r_num_pages = calc_pages_for(off, len);
1284 dout("writepages %llu~%llu (%d pages)\n", off, len,
1285 req->r_num_pages);
1286
1287 rc = ceph_osdc_start_request(osdc, req, nofail);
1288 if (!rc)
1289 rc = ceph_osdc_wait_request(osdc, req);
1290
1291 ceph_osdc_put_request(req);
1292 if (rc == 0)
1293 rc = len;
1294 dout("writepages result %d\n", rc);
1295 return rc;
1296}
1297
1298/*
1299 * handle incoming message
1300 */
1301static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
1302{
1303 struct ceph_osd *osd = con->private;
32c895e7 1304 struct ceph_osd_client *osdc;
f24e9980
SW
1305 int type = le16_to_cpu(msg->hdr.type);
1306
1307 if (!osd)
1308 return;
32c895e7 1309 osdc = osd->o_osdc;
f24e9980
SW
1310
1311 switch (type) {
1312 case CEPH_MSG_OSD_MAP:
1313 ceph_osdc_handle_map(osdc, msg);
1314 break;
1315 case CEPH_MSG_OSD_OPREPLY:
350b1c32 1316 handle_reply(osdc, msg, con);
f24e9980
SW
1317 break;
1318
1319 default:
1320 pr_err("received unknown message type %d %s\n", type,
1321 ceph_msg_type_name(type));
1322 }
1323 ceph_msg_put(msg);
1324}
1325
5b3a4db3
SW
1326/*
1327 * lookup and return message for incoming reply
1328 */
1329static struct ceph_msg *get_reply(struct ceph_connection *con,
2450418c
YS
1330 struct ceph_msg_header *hdr,
1331 int *skip)
f24e9980
SW
1332{
1333 struct ceph_osd *osd = con->private;
1334 struct ceph_osd_client *osdc = osd->o_osdc;
2450418c 1335 struct ceph_msg *m;
0547a9b3 1336 struct ceph_osd_request *req;
5b3a4db3
SW
1337 int front = le32_to_cpu(hdr->front_len);
1338 int data_len = le32_to_cpu(hdr->data_len);
0547a9b3
YS
1339 u64 tid;
1340 int err;
f24e9980 1341
0547a9b3
YS
1342 tid = le64_to_cpu(hdr->tid);
1343 mutex_lock(&osdc->request_mutex);
1344 req = __lookup_request(osdc, tid);
1345 if (!req) {
1346 *skip = 1;
1347 m = NULL;
c16e7869 1348 pr_info("get_reply unknown tid %llu from osd%d\n", tid,
5b3a4db3 1349 osd->o_osd);
0547a9b3
YS
1350 goto out;
1351 }
c16e7869
SW
1352
1353 if (req->r_con_filling_msg) {
1354 dout("get_reply revoking msg %p from old con %p\n",
1355 req->r_reply, req->r_con_filling_msg);
1356 ceph_con_revoke_message(req->r_con_filling_msg, req->r_reply);
1357 ceph_con_put(req->r_con_filling_msg);
0547a9b3
YS
1358 }
1359
c16e7869
SW
1360 if (front > req->r_reply->front.iov_len) {
1361 pr_warning("get_reply front %d > preallocated %d\n",
1362 front, (int)req->r_reply->front.iov_len);
1363 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front, 0, 0, NULL);
1364 if (IS_ERR(m))
1365 goto out;
1366 ceph_msg_put(req->r_reply);
1367 req->r_reply = m;
1368 }
1369 m = ceph_msg_get(req->r_reply);
1370
0547a9b3 1371 if (data_len > 0) {
0d59ab81 1372 err = __prepare_pages(con, hdr, req, tid, m);
0547a9b3
YS
1373 if (err < 0) {
1374 *skip = 1;
1375 ceph_msg_put(m);
1376 m = ERR_PTR(err);
1377 }
1378 }
5b3a4db3 1379 *skip = 0;
c16e7869
SW
1380 req->r_con_filling_msg = ceph_con_get(con);
1381 dout("get_reply tid %lld %p\n", tid, m);
0547a9b3
YS
1382
1383out:
1384 mutex_unlock(&osdc->request_mutex);
2450418c 1385 return m;
5b3a4db3
SW
1386
1387}
1388
1389static struct ceph_msg *alloc_msg(struct ceph_connection *con,
1390 struct ceph_msg_header *hdr,
1391 int *skip)
1392{
1393 struct ceph_osd *osd = con->private;
1394 int type = le16_to_cpu(hdr->type);
1395 int front = le32_to_cpu(hdr->front_len);
1396
1397 switch (type) {
1398 case CEPH_MSG_OSD_MAP:
1399 return ceph_msg_new(type, front, 0, 0, NULL);
1400 case CEPH_MSG_OSD_OPREPLY:
1401 return get_reply(con, hdr, skip);
1402 default:
1403 pr_info("alloc_msg unexpected msg type %d from osd%d\n", type,
1404 osd->o_osd);
1405 *skip = 1;
1406 return NULL;
1407 }
f24e9980
SW
1408}
1409
1410/*
1411 * Wrappers to refcount containing ceph_osd struct
1412 */
1413static struct ceph_connection *get_osd_con(struct ceph_connection *con)
1414{
1415 struct ceph_osd *osd = con->private;
1416 if (get_osd(osd))
1417 return con;
1418 return NULL;
1419}
1420
1421static void put_osd_con(struct ceph_connection *con)
1422{
1423 struct ceph_osd *osd = con->private;
1424 put_osd(osd);
1425}
1426
4e7a5dcd
SW
1427/*
1428 * authentication
1429 */
1430static int get_authorizer(struct ceph_connection *con,
50b885b9
SW
1431 void **buf, int *len, int *proto,
1432 void **reply_buf, int *reply_len, int force_new)
4e7a5dcd
SW
1433{
1434 struct ceph_osd *o = con->private;
1435 struct ceph_osd_client *osdc = o->o_osdc;
1436 struct ceph_auth_client *ac = osdc->client->monc.auth;
1437 int ret = 0;
1438
1439 if (force_new && o->o_authorizer) {
1440 ac->ops->destroy_authorizer(ac, o->o_authorizer);
1441 o->o_authorizer = NULL;
1442 }
1443 if (o->o_authorizer == NULL) {
1444 ret = ac->ops->create_authorizer(
1445 ac, CEPH_ENTITY_TYPE_OSD,
1446 &o->o_authorizer,
1447 &o->o_authorizer_buf,
1448 &o->o_authorizer_buf_len,
1449 &o->o_authorizer_reply_buf,
1450 &o->o_authorizer_reply_buf_len);
1451 if (ret)
1452 return ret;
1453 }
1454
1455 *proto = ac->protocol;
1456 *buf = o->o_authorizer_buf;
1457 *len = o->o_authorizer_buf_len;
1458 *reply_buf = o->o_authorizer_reply_buf;
1459 *reply_len = o->o_authorizer_reply_buf_len;
1460 return 0;
1461}
1462
1463
1464static int verify_authorizer_reply(struct ceph_connection *con, int len)
1465{
1466 struct ceph_osd *o = con->private;
1467 struct ceph_osd_client *osdc = o->o_osdc;
1468 struct ceph_auth_client *ac = osdc->client->monc.auth;
1469
1470 return ac->ops->verify_authorizer_reply(ac, o->o_authorizer, len);
1471}
1472
9bd2e6f8
SW
1473static int invalidate_authorizer(struct ceph_connection *con)
1474{
1475 struct ceph_osd *o = con->private;
1476 struct ceph_osd_client *osdc = o->o_osdc;
1477 struct ceph_auth_client *ac = osdc->client->monc.auth;
1478
1479 if (ac->ops->invalidate_authorizer)
1480 ac->ops->invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
1481
1482 return ceph_monc_validate_auth(&osdc->client->monc);
1483}
4e7a5dcd 1484
f24e9980
SW
1485const static struct ceph_connection_operations osd_con_ops = {
1486 .get = get_osd_con,
1487 .put = put_osd_con,
1488 .dispatch = dispatch,
4e7a5dcd
SW
1489 .get_authorizer = get_authorizer,
1490 .verify_authorizer_reply = verify_authorizer_reply,
9bd2e6f8 1491 .invalidate_authorizer = invalidate_authorizer,
f24e9980 1492 .alloc_msg = alloc_msg,
81b024e7 1493 .fault = osd_reset,
f24e9980 1494};