Merge tag 'amlogic-dt-2' of https://git.kernel.org/pub/scm/linux/kernel/git/khilman...
[linux-2.6-block.git] / net / ceph / osd_client.c
1 // SPDX-License-Identifier: GPL-2.0
2
3 #include <linux/ceph/ceph_debug.h>
4
5 #include <linux/module.h>
6 #include <linux/err.h>
7 #include <linux/highmem.h>
8 #include <linux/mm.h>
9 #include <linux/pagemap.h>
10 #include <linux/slab.h>
11 #include <linux/uaccess.h>
12 #ifdef CONFIG_BLOCK
13 #include <linux/bio.h>
14 #endif
15
16 #include <linux/ceph/ceph_features.h>
17 #include <linux/ceph/libceph.h>
18 #include <linux/ceph/osd_client.h>
19 #include <linux/ceph/messenger.h>
20 #include <linux/ceph/decode.h>
21 #include <linux/ceph/auth.h>
22 #include <linux/ceph/pagelist.h>
23 #include <linux/ceph/striper.h>
24
25 #define OSD_OPREPLY_FRONT_LEN   512
26
27 static struct kmem_cache        *ceph_osd_request_cache;
28
29 static const struct ceph_connection_operations osd_con_ops;
30
31 /*
32  * Implement client access to distributed object storage cluster.
33  *
34  * All data objects are stored within a cluster/cloud of OSDs, or
35  * "object storage devices."  (Note that Ceph OSDs have _nothing_ to
36  * do with the T10 OSD extensions to SCSI.)  Ceph OSDs are simply
37  * remote daemons serving up and coordinating consistent and safe
38  * access to storage.
39  *
40  * Cluster membership and the mapping of data objects onto storage devices
41  * are described by the osd map.
42  *
43  * We keep track of pending OSD requests (read, write), resubmit
44  * requests to different OSDs when the cluster topology/data layout
45  * change, or retry the affected requests when the communications
46  * channel with an OSD is reset.
47  */
48
49 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req);
50 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req);
51 static void link_linger(struct ceph_osd *osd,
52                         struct ceph_osd_linger_request *lreq);
53 static void unlink_linger(struct ceph_osd *osd,
54                           struct ceph_osd_linger_request *lreq);
55 static void clear_backoffs(struct ceph_osd *osd);
56
57 #if 1
58 static inline bool rwsem_is_wrlocked(struct rw_semaphore *sem)
59 {
60         bool wrlocked = true;
61
62         if (unlikely(down_read_trylock(sem))) {
63                 wrlocked = false;
64                 up_read(sem);
65         }
66
67         return wrlocked;
68 }
69 static inline void verify_osdc_locked(struct ceph_osd_client *osdc)
70 {
71         WARN_ON(!rwsem_is_locked(&osdc->lock));
72 }
73 static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc)
74 {
75         WARN_ON(!rwsem_is_wrlocked(&osdc->lock));
76 }
77 static inline void verify_osd_locked(struct ceph_osd *osd)
78 {
79         struct ceph_osd_client *osdc = osd->o_osdc;
80
81         WARN_ON(!(mutex_is_locked(&osd->lock) &&
82                   rwsem_is_locked(&osdc->lock)) &&
83                 !rwsem_is_wrlocked(&osdc->lock));
84 }
85 static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq)
86 {
87         WARN_ON(!mutex_is_locked(&lreq->lock));
88 }
89 #else
90 static inline void verify_osdc_locked(struct ceph_osd_client *osdc) { }
91 static inline void verify_osdc_wrlocked(struct ceph_osd_client *osdc) { }
92 static inline void verify_osd_locked(struct ceph_osd *osd) { }
93 static inline void verify_lreq_locked(struct ceph_osd_linger_request *lreq) { }
94 #endif
95
96 /*
97  * calculate the mapping of a file extent onto an object, and fill out the
98  * request accordingly.  shorten extent as necessary if it crosses an
99  * object boundary.
100  *
101  * fill osd op in request message.
102  */
103 static int calc_layout(struct ceph_file_layout *layout, u64 off, u64 *plen,
104                         u64 *objnum, u64 *objoff, u64 *objlen)
105 {
106         u64 orig_len = *plen;
107         u32 xlen;
108
109         /* object extent? */
110         ceph_calc_file_object_mapping(layout, off, orig_len, objnum,
111                                           objoff, &xlen);
112         *objlen = xlen;
113         if (*objlen < orig_len) {
114                 *plen = *objlen;
115                 dout(" skipping last %llu, final file extent %llu~%llu\n",
116                      orig_len - *plen, off, *plen);
117         }
118
119         dout("calc_layout objnum=%llx %llu~%llu\n", *objnum, *objoff, *objlen);
120         return 0;
121 }
122
123 static void ceph_osd_data_init(struct ceph_osd_data *osd_data)
124 {
125         memset(osd_data, 0, sizeof (*osd_data));
126         osd_data->type = CEPH_OSD_DATA_TYPE_NONE;
127 }
128
129 static void ceph_osd_data_pages_init(struct ceph_osd_data *osd_data,
130                         struct page **pages, u64 length, u32 alignment,
131                         bool pages_from_pool, bool own_pages)
132 {
133         osd_data->type = CEPH_OSD_DATA_TYPE_PAGES;
134         osd_data->pages = pages;
135         osd_data->length = length;
136         osd_data->alignment = alignment;
137         osd_data->pages_from_pool = pages_from_pool;
138         osd_data->own_pages = own_pages;
139 }
140
141 static void ceph_osd_data_pagelist_init(struct ceph_osd_data *osd_data,
142                         struct ceph_pagelist *pagelist)
143 {
144         osd_data->type = CEPH_OSD_DATA_TYPE_PAGELIST;
145         osd_data->pagelist = pagelist;
146 }
147
148 #ifdef CONFIG_BLOCK
149 static void ceph_osd_data_bio_init(struct ceph_osd_data *osd_data,
150                                    struct ceph_bio_iter *bio_pos,
151                                    u32 bio_length)
152 {
153         osd_data->type = CEPH_OSD_DATA_TYPE_BIO;
154         osd_data->bio_pos = *bio_pos;
155         osd_data->bio_length = bio_length;
156 }
157 #endif /* CONFIG_BLOCK */
158
159 static void ceph_osd_data_bvecs_init(struct ceph_osd_data *osd_data,
160                                      struct ceph_bvec_iter *bvec_pos,
161                                      u32 num_bvecs)
162 {
163         osd_data->type = CEPH_OSD_DATA_TYPE_BVECS;
164         osd_data->bvec_pos = *bvec_pos;
165         osd_data->num_bvecs = num_bvecs;
166 }
167
168 #define osd_req_op_data(oreq, whch, typ, fld)                           \
169 ({                                                                      \
170         struct ceph_osd_request *__oreq = (oreq);                       \
171         unsigned int __whch = (whch);                                   \
172         BUG_ON(__whch >= __oreq->r_num_ops);                            \
173         &__oreq->r_ops[__whch].typ.fld;                                 \
174 })
175
176 static struct ceph_osd_data *
177 osd_req_op_raw_data_in(struct ceph_osd_request *osd_req, unsigned int which)
178 {
179         BUG_ON(which >= osd_req->r_num_ops);
180
181         return &osd_req->r_ops[which].raw_data_in;
182 }
183
184 struct ceph_osd_data *
185 osd_req_op_extent_osd_data(struct ceph_osd_request *osd_req,
186                         unsigned int which)
187 {
188         return osd_req_op_data(osd_req, which, extent, osd_data);
189 }
190 EXPORT_SYMBOL(osd_req_op_extent_osd_data);
191
192 void osd_req_op_raw_data_in_pages(struct ceph_osd_request *osd_req,
193                         unsigned int which, struct page **pages,
194                         u64 length, u32 alignment,
195                         bool pages_from_pool, bool own_pages)
196 {
197         struct ceph_osd_data *osd_data;
198
199         osd_data = osd_req_op_raw_data_in(osd_req, which);
200         ceph_osd_data_pages_init(osd_data, pages, length, alignment,
201                                 pages_from_pool, own_pages);
202 }
203 EXPORT_SYMBOL(osd_req_op_raw_data_in_pages);
204
205 void osd_req_op_extent_osd_data_pages(struct ceph_osd_request *osd_req,
206                         unsigned int which, struct page **pages,
207                         u64 length, u32 alignment,
208                         bool pages_from_pool, bool own_pages)
209 {
210         struct ceph_osd_data *osd_data;
211
212         osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
213         ceph_osd_data_pages_init(osd_data, pages, length, alignment,
214                                 pages_from_pool, own_pages);
215 }
216 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pages);
217
218 void osd_req_op_extent_osd_data_pagelist(struct ceph_osd_request *osd_req,
219                         unsigned int which, struct ceph_pagelist *pagelist)
220 {
221         struct ceph_osd_data *osd_data;
222
223         osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
224         ceph_osd_data_pagelist_init(osd_data, pagelist);
225 }
226 EXPORT_SYMBOL(osd_req_op_extent_osd_data_pagelist);
227
228 #ifdef CONFIG_BLOCK
229 void osd_req_op_extent_osd_data_bio(struct ceph_osd_request *osd_req,
230                                     unsigned int which,
231                                     struct ceph_bio_iter *bio_pos,
232                                     u32 bio_length)
233 {
234         struct ceph_osd_data *osd_data;
235
236         osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
237         ceph_osd_data_bio_init(osd_data, bio_pos, bio_length);
238 }
239 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bio);
240 #endif /* CONFIG_BLOCK */
241
242 void osd_req_op_extent_osd_data_bvecs(struct ceph_osd_request *osd_req,
243                                       unsigned int which,
244                                       struct bio_vec *bvecs, u32 num_bvecs,
245                                       u32 bytes)
246 {
247         struct ceph_osd_data *osd_data;
248         struct ceph_bvec_iter it = {
249                 .bvecs = bvecs,
250                 .iter = { .bi_size = bytes },
251         };
252
253         osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
254         ceph_osd_data_bvecs_init(osd_data, &it, num_bvecs);
255 }
256 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvecs);
257
258 void osd_req_op_extent_osd_data_bvec_pos(struct ceph_osd_request *osd_req,
259                                          unsigned int which,
260                                          struct ceph_bvec_iter *bvec_pos)
261 {
262         struct ceph_osd_data *osd_data;
263
264         osd_data = osd_req_op_data(osd_req, which, extent, osd_data);
265         ceph_osd_data_bvecs_init(osd_data, bvec_pos, 0);
266 }
267 EXPORT_SYMBOL(osd_req_op_extent_osd_data_bvec_pos);
268
269 static void osd_req_op_cls_request_info_pagelist(
270                         struct ceph_osd_request *osd_req,
271                         unsigned int which, struct ceph_pagelist *pagelist)
272 {
273         struct ceph_osd_data *osd_data;
274
275         osd_data = osd_req_op_data(osd_req, which, cls, request_info);
276         ceph_osd_data_pagelist_init(osd_data, pagelist);
277 }
278
279 void osd_req_op_cls_request_data_pagelist(
280                         struct ceph_osd_request *osd_req,
281                         unsigned int which, struct ceph_pagelist *pagelist)
282 {
283         struct ceph_osd_data *osd_data;
284
285         osd_data = osd_req_op_data(osd_req, which, cls, request_data);
286         ceph_osd_data_pagelist_init(osd_data, pagelist);
287         osd_req->r_ops[which].cls.indata_len += pagelist->length;
288         osd_req->r_ops[which].indata_len += pagelist->length;
289 }
290 EXPORT_SYMBOL(osd_req_op_cls_request_data_pagelist);
291
292 void osd_req_op_cls_request_data_pages(struct ceph_osd_request *osd_req,
293                         unsigned int which, struct page **pages, u64 length,
294                         u32 alignment, bool pages_from_pool, bool own_pages)
295 {
296         struct ceph_osd_data *osd_data;
297
298         osd_data = osd_req_op_data(osd_req, which, cls, request_data);
299         ceph_osd_data_pages_init(osd_data, pages, length, alignment,
300                                 pages_from_pool, own_pages);
301         osd_req->r_ops[which].cls.indata_len += length;
302         osd_req->r_ops[which].indata_len += length;
303 }
304 EXPORT_SYMBOL(osd_req_op_cls_request_data_pages);
305
306 void osd_req_op_cls_request_data_bvecs(struct ceph_osd_request *osd_req,
307                                        unsigned int which,
308                                        struct bio_vec *bvecs, u32 num_bvecs,
309                                        u32 bytes)
310 {
311         struct ceph_osd_data *osd_data;
312         struct ceph_bvec_iter it = {
313                 .bvecs = bvecs,
314                 .iter = { .bi_size = bytes },
315         };
316
317         osd_data = osd_req_op_data(osd_req, which, cls, request_data);
318         ceph_osd_data_bvecs_init(osd_data, &it, num_bvecs);
319         osd_req->r_ops[which].cls.indata_len += bytes;
320         osd_req->r_ops[which].indata_len += bytes;
321 }
322 EXPORT_SYMBOL(osd_req_op_cls_request_data_bvecs);
323
324 void osd_req_op_cls_response_data_pages(struct ceph_osd_request *osd_req,
325                         unsigned int which, struct page **pages, u64 length,
326                         u32 alignment, bool pages_from_pool, bool own_pages)
327 {
328         struct ceph_osd_data *osd_data;
329
330         osd_data = osd_req_op_data(osd_req, which, cls, response_data);
331         ceph_osd_data_pages_init(osd_data, pages, length, alignment,
332                                 pages_from_pool, own_pages);
333 }
334 EXPORT_SYMBOL(osd_req_op_cls_response_data_pages);
335
336 static u64 ceph_osd_data_length(struct ceph_osd_data *osd_data)
337 {
338         switch (osd_data->type) {
339         case CEPH_OSD_DATA_TYPE_NONE:
340                 return 0;
341         case CEPH_OSD_DATA_TYPE_PAGES:
342                 return osd_data->length;
343         case CEPH_OSD_DATA_TYPE_PAGELIST:
344                 return (u64)osd_data->pagelist->length;
345 #ifdef CONFIG_BLOCK
346         case CEPH_OSD_DATA_TYPE_BIO:
347                 return (u64)osd_data->bio_length;
348 #endif /* CONFIG_BLOCK */
349         case CEPH_OSD_DATA_TYPE_BVECS:
350                 return osd_data->bvec_pos.iter.bi_size;
351         default:
352                 WARN(true, "unrecognized data type %d\n", (int)osd_data->type);
353                 return 0;
354         }
355 }
356
357 static void ceph_osd_data_release(struct ceph_osd_data *osd_data)
358 {
359         if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES && osd_data->own_pages) {
360                 int num_pages;
361
362                 num_pages = calc_pages_for((u64)osd_data->alignment,
363                                                 (u64)osd_data->length);
364                 ceph_release_page_vector(osd_data->pages, num_pages);
365         }
366         ceph_osd_data_init(osd_data);
367 }
368
369 static void osd_req_op_data_release(struct ceph_osd_request *osd_req,
370                         unsigned int which)
371 {
372         struct ceph_osd_req_op *op;
373
374         BUG_ON(which >= osd_req->r_num_ops);
375         op = &osd_req->r_ops[which];
376
377         switch (op->op) {
378         case CEPH_OSD_OP_READ:
379         case CEPH_OSD_OP_WRITE:
380         case CEPH_OSD_OP_WRITEFULL:
381                 ceph_osd_data_release(&op->extent.osd_data);
382                 break;
383         case CEPH_OSD_OP_CALL:
384                 ceph_osd_data_release(&op->cls.request_info);
385                 ceph_osd_data_release(&op->cls.request_data);
386                 ceph_osd_data_release(&op->cls.response_data);
387                 break;
388         case CEPH_OSD_OP_SETXATTR:
389         case CEPH_OSD_OP_CMPXATTR:
390                 ceph_osd_data_release(&op->xattr.osd_data);
391                 break;
392         case CEPH_OSD_OP_STAT:
393                 ceph_osd_data_release(&op->raw_data_in);
394                 break;
395         case CEPH_OSD_OP_NOTIFY_ACK:
396                 ceph_osd_data_release(&op->notify_ack.request_data);
397                 break;
398         case CEPH_OSD_OP_NOTIFY:
399                 ceph_osd_data_release(&op->notify.request_data);
400                 ceph_osd_data_release(&op->notify.response_data);
401                 break;
402         case CEPH_OSD_OP_LIST_WATCHERS:
403                 ceph_osd_data_release(&op->list_watchers.response_data);
404                 break;
405         default:
406                 break;
407         }
408 }
409
410 /*
411  * Assumes @t is zero-initialized.
412  */
413 static void target_init(struct ceph_osd_request_target *t)
414 {
415         ceph_oid_init(&t->base_oid);
416         ceph_oloc_init(&t->base_oloc);
417         ceph_oid_init(&t->target_oid);
418         ceph_oloc_init(&t->target_oloc);
419
420         ceph_osds_init(&t->acting);
421         ceph_osds_init(&t->up);
422         t->size = -1;
423         t->min_size = -1;
424
425         t->osd = CEPH_HOMELESS_OSD;
426 }
427
428 static void target_copy(struct ceph_osd_request_target *dest,
429                         const struct ceph_osd_request_target *src)
430 {
431         ceph_oid_copy(&dest->base_oid, &src->base_oid);
432         ceph_oloc_copy(&dest->base_oloc, &src->base_oloc);
433         ceph_oid_copy(&dest->target_oid, &src->target_oid);
434         ceph_oloc_copy(&dest->target_oloc, &src->target_oloc);
435
436         dest->pgid = src->pgid; /* struct */
437         dest->spgid = src->spgid; /* struct */
438         dest->pg_num = src->pg_num;
439         dest->pg_num_mask = src->pg_num_mask;
440         ceph_osds_copy(&dest->acting, &src->acting);
441         ceph_osds_copy(&dest->up, &src->up);
442         dest->size = src->size;
443         dest->min_size = src->min_size;
444         dest->sort_bitwise = src->sort_bitwise;
445
446         dest->flags = src->flags;
447         dest->paused = src->paused;
448
449         dest->epoch = src->epoch;
450         dest->last_force_resend = src->last_force_resend;
451
452         dest->osd = src->osd;
453 }
454
455 static void target_destroy(struct ceph_osd_request_target *t)
456 {
457         ceph_oid_destroy(&t->base_oid);
458         ceph_oloc_destroy(&t->base_oloc);
459         ceph_oid_destroy(&t->target_oid);
460         ceph_oloc_destroy(&t->target_oloc);
461 }
462
463 /*
464  * requests
465  */
466 static void request_release_checks(struct ceph_osd_request *req)
467 {
468         WARN_ON(!RB_EMPTY_NODE(&req->r_node));
469         WARN_ON(!RB_EMPTY_NODE(&req->r_mc_node));
470         WARN_ON(!list_empty(&req->r_unsafe_item));
471         WARN_ON(req->r_osd);
472 }
473
474 static void ceph_osdc_release_request(struct kref *kref)
475 {
476         struct ceph_osd_request *req = container_of(kref,
477                                             struct ceph_osd_request, r_kref);
478         unsigned int which;
479
480         dout("%s %p (r_request %p r_reply %p)\n", __func__, req,
481              req->r_request, req->r_reply);
482         request_release_checks(req);
483
484         if (req->r_request)
485                 ceph_msg_put(req->r_request);
486         if (req->r_reply)
487                 ceph_msg_put(req->r_reply);
488
489         for (which = 0; which < req->r_num_ops; which++)
490                 osd_req_op_data_release(req, which);
491
492         target_destroy(&req->r_t);
493         ceph_put_snap_context(req->r_snapc);
494
495         if (req->r_mempool)
496                 mempool_free(req, req->r_osdc->req_mempool);
497         else if (req->r_num_ops <= CEPH_OSD_SLAB_OPS)
498                 kmem_cache_free(ceph_osd_request_cache, req);
499         else
500                 kfree(req);
501 }
502
503 void ceph_osdc_get_request(struct ceph_osd_request *req)
504 {
505         dout("%s %p (was %d)\n", __func__, req,
506              kref_read(&req->r_kref));
507         kref_get(&req->r_kref);
508 }
509 EXPORT_SYMBOL(ceph_osdc_get_request);
510
511 void ceph_osdc_put_request(struct ceph_osd_request *req)
512 {
513         if (req) {
514                 dout("%s %p (was %d)\n", __func__, req,
515                      kref_read(&req->r_kref));
516                 kref_put(&req->r_kref, ceph_osdc_release_request);
517         }
518 }
519 EXPORT_SYMBOL(ceph_osdc_put_request);
520
521 static void request_init(struct ceph_osd_request *req)
522 {
523         /* req only, each op is zeroed in _osd_req_op_init() */
524         memset(req, 0, sizeof(*req));
525
526         kref_init(&req->r_kref);
527         init_completion(&req->r_completion);
528         RB_CLEAR_NODE(&req->r_node);
529         RB_CLEAR_NODE(&req->r_mc_node);
530         INIT_LIST_HEAD(&req->r_unsafe_item);
531
532         target_init(&req->r_t);
533 }
534
535 /*
536  * This is ugly, but it allows us to reuse linger registration and ping
537  * requests, keeping the structure of the code around send_linger{_ping}()
538  * reasonable.  Setting up a min_nr=2 mempool for each linger request
539  * and dealing with copying ops (this blasts req only, watch op remains
540  * intact) isn't any better.
541  */
542 static void request_reinit(struct ceph_osd_request *req)
543 {
544         struct ceph_osd_client *osdc = req->r_osdc;
545         bool mempool = req->r_mempool;
546         unsigned int num_ops = req->r_num_ops;
547         u64 snapid = req->r_snapid;
548         struct ceph_snap_context *snapc = req->r_snapc;
549         bool linger = req->r_linger;
550         struct ceph_msg *request_msg = req->r_request;
551         struct ceph_msg *reply_msg = req->r_reply;
552
553         dout("%s req %p\n", __func__, req);
554         WARN_ON(kref_read(&req->r_kref) != 1);
555         request_release_checks(req);
556
557         WARN_ON(kref_read(&request_msg->kref) != 1);
558         WARN_ON(kref_read(&reply_msg->kref) != 1);
559         target_destroy(&req->r_t);
560
561         request_init(req);
562         req->r_osdc = osdc;
563         req->r_mempool = mempool;
564         req->r_num_ops = num_ops;
565         req->r_snapid = snapid;
566         req->r_snapc = snapc;
567         req->r_linger = linger;
568         req->r_request = request_msg;
569         req->r_reply = reply_msg;
570 }
571
572 struct ceph_osd_request *ceph_osdc_alloc_request(struct ceph_osd_client *osdc,
573                                                struct ceph_snap_context *snapc,
574                                                unsigned int num_ops,
575                                                bool use_mempool,
576                                                gfp_t gfp_flags)
577 {
578         struct ceph_osd_request *req;
579
580         if (use_mempool) {
581                 BUG_ON(num_ops > CEPH_OSD_SLAB_OPS);
582                 req = mempool_alloc(osdc->req_mempool, gfp_flags);
583         } else if (num_ops <= CEPH_OSD_SLAB_OPS) {
584                 req = kmem_cache_alloc(ceph_osd_request_cache, gfp_flags);
585         } else {
586                 BUG_ON(num_ops > CEPH_OSD_MAX_OPS);
587                 req = kmalloc(sizeof(*req) + num_ops * sizeof(req->r_ops[0]),
588                               gfp_flags);
589         }
590         if (unlikely(!req))
591                 return NULL;
592
593         request_init(req);
594         req->r_osdc = osdc;
595         req->r_mempool = use_mempool;
596         req->r_num_ops = num_ops;
597         req->r_snapid = CEPH_NOSNAP;
598         req->r_snapc = ceph_get_snap_context(snapc);
599
600         dout("%s req %p\n", __func__, req);
601         return req;
602 }
603 EXPORT_SYMBOL(ceph_osdc_alloc_request);
604
605 static int ceph_oloc_encoding_size(const struct ceph_object_locator *oloc)
606 {
607         return 8 + 4 + 4 + 4 + (oloc->pool_ns ? oloc->pool_ns->len : 0);
608 }
609
610 int ceph_osdc_alloc_messages(struct ceph_osd_request *req, gfp_t gfp)
611 {
612         struct ceph_osd_client *osdc = req->r_osdc;
613         struct ceph_msg *msg;
614         int msg_size;
615
616         WARN_ON(ceph_oid_empty(&req->r_base_oid));
617         WARN_ON(ceph_oloc_empty(&req->r_base_oloc));
618
619         /* create request message */
620         msg_size = CEPH_ENCODING_START_BLK_LEN +
621                         CEPH_PGID_ENCODING_LEN + 1; /* spgid */
622         msg_size += 4 + 4 + 4; /* hash, osdmap_epoch, flags */
623         msg_size += CEPH_ENCODING_START_BLK_LEN +
624                         sizeof(struct ceph_osd_reqid); /* reqid */
625         msg_size += sizeof(struct ceph_blkin_trace_info); /* trace */
626         msg_size += 4 + sizeof(struct ceph_timespec); /* client_inc, mtime */
627         msg_size += CEPH_ENCODING_START_BLK_LEN +
628                         ceph_oloc_encoding_size(&req->r_base_oloc); /* oloc */
629         msg_size += 4 + req->r_base_oid.name_len; /* oid */
630         msg_size += 2 + req->r_num_ops * sizeof(struct ceph_osd_op);
631         msg_size += 8; /* snapid */
632         msg_size += 8; /* snap_seq */
633         msg_size += 4 + 8 * (req->r_snapc ? req->r_snapc->num_snaps : 0);
634         msg_size += 4 + 8; /* retry_attempt, features */
635
636         if (req->r_mempool)
637                 msg = ceph_msgpool_get(&osdc->msgpool_op, 0);
638         else
639                 msg = ceph_msg_new(CEPH_MSG_OSD_OP, msg_size, gfp, true);
640         if (!msg)
641                 return -ENOMEM;
642
643         memset(msg->front.iov_base, 0, msg->front.iov_len);
644         req->r_request = msg;
645
646         /* create reply message */
647         msg_size = OSD_OPREPLY_FRONT_LEN;
648         msg_size += req->r_base_oid.name_len;
649         msg_size += req->r_num_ops * sizeof(struct ceph_osd_op);
650
651         if (req->r_mempool)
652                 msg = ceph_msgpool_get(&osdc->msgpool_op_reply, 0);
653         else
654                 msg = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, msg_size, gfp, true);
655         if (!msg)
656                 return -ENOMEM;
657
658         req->r_reply = msg;
659
660         return 0;
661 }
662 EXPORT_SYMBOL(ceph_osdc_alloc_messages);
663
664 static bool osd_req_opcode_valid(u16 opcode)
665 {
666         switch (opcode) {
667 #define GENERATE_CASE(op, opcode, str)  case CEPH_OSD_OP_##op: return true;
668 __CEPH_FORALL_OSD_OPS(GENERATE_CASE)
669 #undef GENERATE_CASE
670         default:
671                 return false;
672         }
673 }
674
675 /*
676  * This is an osd op init function for opcodes that have no data or
677  * other information associated with them.  It also serves as a
678  * common init routine for all the other init functions, below.
679  */
680 static struct ceph_osd_req_op *
681 _osd_req_op_init(struct ceph_osd_request *osd_req, unsigned int which,
682                  u16 opcode, u32 flags)
683 {
684         struct ceph_osd_req_op *op;
685
686         BUG_ON(which >= osd_req->r_num_ops);
687         BUG_ON(!osd_req_opcode_valid(opcode));
688
689         op = &osd_req->r_ops[which];
690         memset(op, 0, sizeof (*op));
691         op->op = opcode;
692         op->flags = flags;
693
694         return op;
695 }
696
697 void osd_req_op_init(struct ceph_osd_request *osd_req,
698                      unsigned int which, u16 opcode, u32 flags)
699 {
700         (void)_osd_req_op_init(osd_req, which, opcode, flags);
701 }
702 EXPORT_SYMBOL(osd_req_op_init);
703
704 void osd_req_op_extent_init(struct ceph_osd_request *osd_req,
705                                 unsigned int which, u16 opcode,
706                                 u64 offset, u64 length,
707                                 u64 truncate_size, u32 truncate_seq)
708 {
709         struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
710                                                       opcode, 0);
711         size_t payload_len = 0;
712
713         BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
714                opcode != CEPH_OSD_OP_WRITEFULL && opcode != CEPH_OSD_OP_ZERO &&
715                opcode != CEPH_OSD_OP_TRUNCATE);
716
717         op->extent.offset = offset;
718         op->extent.length = length;
719         op->extent.truncate_size = truncate_size;
720         op->extent.truncate_seq = truncate_seq;
721         if (opcode == CEPH_OSD_OP_WRITE || opcode == CEPH_OSD_OP_WRITEFULL)
722                 payload_len += length;
723
724         op->indata_len = payload_len;
725 }
726 EXPORT_SYMBOL(osd_req_op_extent_init);
727
728 void osd_req_op_extent_update(struct ceph_osd_request *osd_req,
729                                 unsigned int which, u64 length)
730 {
731         struct ceph_osd_req_op *op;
732         u64 previous;
733
734         BUG_ON(which >= osd_req->r_num_ops);
735         op = &osd_req->r_ops[which];
736         previous = op->extent.length;
737
738         if (length == previous)
739                 return;         /* Nothing to do */
740         BUG_ON(length > previous);
741
742         op->extent.length = length;
743         if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
744                 op->indata_len -= previous - length;
745 }
746 EXPORT_SYMBOL(osd_req_op_extent_update);
747
748 void osd_req_op_extent_dup_last(struct ceph_osd_request *osd_req,
749                                 unsigned int which, u64 offset_inc)
750 {
751         struct ceph_osd_req_op *op, *prev_op;
752
753         BUG_ON(which + 1 >= osd_req->r_num_ops);
754
755         prev_op = &osd_req->r_ops[which];
756         op = _osd_req_op_init(osd_req, which + 1, prev_op->op, prev_op->flags);
757         /* dup previous one */
758         op->indata_len = prev_op->indata_len;
759         op->outdata_len = prev_op->outdata_len;
760         op->extent = prev_op->extent;
761         /* adjust offset */
762         op->extent.offset += offset_inc;
763         op->extent.length -= offset_inc;
764
765         if (op->op == CEPH_OSD_OP_WRITE || op->op == CEPH_OSD_OP_WRITEFULL)
766                 op->indata_len -= offset_inc;
767 }
768 EXPORT_SYMBOL(osd_req_op_extent_dup_last);
769
770 void osd_req_op_cls_init(struct ceph_osd_request *osd_req, unsigned int which,
771                         u16 opcode, const char *class, const char *method)
772 {
773         struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
774                                                       opcode, 0);
775         struct ceph_pagelist *pagelist;
776         size_t payload_len = 0;
777         size_t size;
778
779         BUG_ON(opcode != CEPH_OSD_OP_CALL);
780
781         pagelist = kmalloc(sizeof (*pagelist), GFP_NOFS);
782         BUG_ON(!pagelist);
783         ceph_pagelist_init(pagelist);
784
785         op->cls.class_name = class;
786         size = strlen(class);
787         BUG_ON(size > (size_t) U8_MAX);
788         op->cls.class_len = size;
789         ceph_pagelist_append(pagelist, class, size);
790         payload_len += size;
791
792         op->cls.method_name = method;
793         size = strlen(method);
794         BUG_ON(size > (size_t) U8_MAX);
795         op->cls.method_len = size;
796         ceph_pagelist_append(pagelist, method, size);
797         payload_len += size;
798
799         osd_req_op_cls_request_info_pagelist(osd_req, which, pagelist);
800
801         op->indata_len = payload_len;
802 }
803 EXPORT_SYMBOL(osd_req_op_cls_init);
804
805 int osd_req_op_xattr_init(struct ceph_osd_request *osd_req, unsigned int which,
806                           u16 opcode, const char *name, const void *value,
807                           size_t size, u8 cmp_op, u8 cmp_mode)
808 {
809         struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
810                                                       opcode, 0);
811         struct ceph_pagelist *pagelist;
812         size_t payload_len;
813
814         BUG_ON(opcode != CEPH_OSD_OP_SETXATTR && opcode != CEPH_OSD_OP_CMPXATTR);
815
816         pagelist = kmalloc(sizeof(*pagelist), GFP_NOFS);
817         if (!pagelist)
818                 return -ENOMEM;
819
820         ceph_pagelist_init(pagelist);
821
822         payload_len = strlen(name);
823         op->xattr.name_len = payload_len;
824         ceph_pagelist_append(pagelist, name, payload_len);
825
826         op->xattr.value_len = size;
827         ceph_pagelist_append(pagelist, value, size);
828         payload_len += size;
829
830         op->xattr.cmp_op = cmp_op;
831         op->xattr.cmp_mode = cmp_mode;
832
833         ceph_osd_data_pagelist_init(&op->xattr.osd_data, pagelist);
834         op->indata_len = payload_len;
835         return 0;
836 }
837 EXPORT_SYMBOL(osd_req_op_xattr_init);
838
839 /*
840  * @watch_opcode: CEPH_OSD_WATCH_OP_*
841  */
842 static void osd_req_op_watch_init(struct ceph_osd_request *req, int which,
843                                   u64 cookie, u8 watch_opcode)
844 {
845         struct ceph_osd_req_op *op;
846
847         op = _osd_req_op_init(req, which, CEPH_OSD_OP_WATCH, 0);
848         op->watch.cookie = cookie;
849         op->watch.op = watch_opcode;
850         op->watch.gen = 0;
851 }
852
853 void osd_req_op_alloc_hint_init(struct ceph_osd_request *osd_req,
854                                 unsigned int which,
855                                 u64 expected_object_size,
856                                 u64 expected_write_size)
857 {
858         struct ceph_osd_req_op *op = _osd_req_op_init(osd_req, which,
859                                                       CEPH_OSD_OP_SETALLOCHINT,
860                                                       0);
861
862         op->alloc_hint.expected_object_size = expected_object_size;
863         op->alloc_hint.expected_write_size = expected_write_size;
864
865         /*
866          * CEPH_OSD_OP_SETALLOCHINT op is advisory and therefore deemed
867          * not worth a feature bit.  Set FAILOK per-op flag to make
868          * sure older osds don't trip over an unsupported opcode.
869          */
870         op->flags |= CEPH_OSD_OP_FLAG_FAILOK;
871 }
872 EXPORT_SYMBOL(osd_req_op_alloc_hint_init);
873
874 static void ceph_osdc_msg_data_add(struct ceph_msg *msg,
875                                 struct ceph_osd_data *osd_data)
876 {
877         u64 length = ceph_osd_data_length(osd_data);
878
879         if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGES) {
880                 BUG_ON(length > (u64) SIZE_MAX);
881                 if (length)
882                         ceph_msg_data_add_pages(msg, osd_data->pages,
883                                         length, osd_data->alignment);
884         } else if (osd_data->type == CEPH_OSD_DATA_TYPE_PAGELIST) {
885                 BUG_ON(!length);
886                 ceph_msg_data_add_pagelist(msg, osd_data->pagelist);
887 #ifdef CONFIG_BLOCK
888         } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BIO) {
889                 ceph_msg_data_add_bio(msg, &osd_data->bio_pos, length);
890 #endif
891         } else if (osd_data->type == CEPH_OSD_DATA_TYPE_BVECS) {
892                 ceph_msg_data_add_bvecs(msg, &osd_data->bvec_pos);
893         } else {
894                 BUG_ON(osd_data->type != CEPH_OSD_DATA_TYPE_NONE);
895         }
896 }
897
898 static u32 osd_req_encode_op(struct ceph_osd_op *dst,
899                              const struct ceph_osd_req_op *src)
900 {
901         if (WARN_ON(!osd_req_opcode_valid(src->op))) {
902                 pr_err("unrecognized osd opcode %d\n", src->op);
903
904                 return 0;
905         }
906
907         switch (src->op) {
908         case CEPH_OSD_OP_STAT:
909                 break;
910         case CEPH_OSD_OP_READ:
911         case CEPH_OSD_OP_WRITE:
912         case CEPH_OSD_OP_WRITEFULL:
913         case CEPH_OSD_OP_ZERO:
914         case CEPH_OSD_OP_TRUNCATE:
915                 dst->extent.offset = cpu_to_le64(src->extent.offset);
916                 dst->extent.length = cpu_to_le64(src->extent.length);
917                 dst->extent.truncate_size =
918                         cpu_to_le64(src->extent.truncate_size);
919                 dst->extent.truncate_seq =
920                         cpu_to_le32(src->extent.truncate_seq);
921                 break;
922         case CEPH_OSD_OP_CALL:
923                 dst->cls.class_len = src->cls.class_len;
924                 dst->cls.method_len = src->cls.method_len;
925                 dst->cls.indata_len = cpu_to_le32(src->cls.indata_len);
926                 break;
927         case CEPH_OSD_OP_WATCH:
928                 dst->watch.cookie = cpu_to_le64(src->watch.cookie);
929                 dst->watch.ver = cpu_to_le64(0);
930                 dst->watch.op = src->watch.op;
931                 dst->watch.gen = cpu_to_le32(src->watch.gen);
932                 break;
933         case CEPH_OSD_OP_NOTIFY_ACK:
934                 break;
935         case CEPH_OSD_OP_NOTIFY:
936                 dst->notify.cookie = cpu_to_le64(src->notify.cookie);
937                 break;
938         case CEPH_OSD_OP_LIST_WATCHERS:
939                 break;
940         case CEPH_OSD_OP_SETALLOCHINT:
941                 dst->alloc_hint.expected_object_size =
942                     cpu_to_le64(src->alloc_hint.expected_object_size);
943                 dst->alloc_hint.expected_write_size =
944                     cpu_to_le64(src->alloc_hint.expected_write_size);
945                 break;
946         case CEPH_OSD_OP_SETXATTR:
947         case CEPH_OSD_OP_CMPXATTR:
948                 dst->xattr.name_len = cpu_to_le32(src->xattr.name_len);
949                 dst->xattr.value_len = cpu_to_le32(src->xattr.value_len);
950                 dst->xattr.cmp_op = src->xattr.cmp_op;
951                 dst->xattr.cmp_mode = src->xattr.cmp_mode;
952                 break;
953         case CEPH_OSD_OP_CREATE:
954         case CEPH_OSD_OP_DELETE:
955                 break;
956         default:
957                 pr_err("unsupported osd opcode %s\n",
958                         ceph_osd_op_name(src->op));
959                 WARN_ON(1);
960
961                 return 0;
962         }
963
964         dst->op = cpu_to_le16(src->op);
965         dst->flags = cpu_to_le32(src->flags);
966         dst->payload_len = cpu_to_le32(src->indata_len);
967
968         return src->indata_len;
969 }
970
971 /*
972  * build new request AND message, calculate layout, and adjust file
973  * extent as needed.
974  *
975  * if the file was recently truncated, we include information about its
976  * old and new size so that the object can be updated appropriately.  (we
977  * avoid synchronously deleting truncated objects because it's slow.)
978  */
979 struct ceph_osd_request *ceph_osdc_new_request(struct ceph_osd_client *osdc,
980                                                struct ceph_file_layout *layout,
981                                                struct ceph_vino vino,
982                                                u64 off, u64 *plen,
983                                                unsigned int which, int num_ops,
984                                                int opcode, int flags,
985                                                struct ceph_snap_context *snapc,
986                                                u32 truncate_seq,
987                                                u64 truncate_size,
988                                                bool use_mempool)
989 {
990         struct ceph_osd_request *req;
991         u64 objnum = 0;
992         u64 objoff = 0;
993         u64 objlen = 0;
994         int r;
995
996         BUG_ON(opcode != CEPH_OSD_OP_READ && opcode != CEPH_OSD_OP_WRITE &&
997                opcode != CEPH_OSD_OP_ZERO && opcode != CEPH_OSD_OP_TRUNCATE &&
998                opcode != CEPH_OSD_OP_CREATE && opcode != CEPH_OSD_OP_DELETE);
999
1000         req = ceph_osdc_alloc_request(osdc, snapc, num_ops, use_mempool,
1001                                         GFP_NOFS);
1002         if (!req) {
1003                 r = -ENOMEM;
1004                 goto fail;
1005         }
1006
1007         /* calculate max write size */
1008         r = calc_layout(layout, off, plen, &objnum, &objoff, &objlen);
1009         if (r)
1010                 goto fail;
1011
1012         if (opcode == CEPH_OSD_OP_CREATE || opcode == CEPH_OSD_OP_DELETE) {
1013                 osd_req_op_init(req, which, opcode, 0);
1014         } else {
1015                 u32 object_size = layout->object_size;
1016                 u32 object_base = off - objoff;
1017                 if (!(truncate_seq == 1 && truncate_size == -1ULL)) {
1018                         if (truncate_size <= object_base) {
1019                                 truncate_size = 0;
1020                         } else {
1021                                 truncate_size -= object_base;
1022                                 if (truncate_size > object_size)
1023                                         truncate_size = object_size;
1024                         }
1025                 }
1026                 osd_req_op_extent_init(req, which, opcode, objoff, objlen,
1027                                        truncate_size, truncate_seq);
1028         }
1029
1030         req->r_abort_on_full = true;
1031         req->r_flags = flags;
1032         req->r_base_oloc.pool = layout->pool_id;
1033         req->r_base_oloc.pool_ns = ceph_try_get_string(layout->pool_ns);
1034         ceph_oid_printf(&req->r_base_oid, "%llx.%08llx", vino.ino, objnum);
1035
1036         req->r_snapid = vino.snap;
1037         if (flags & CEPH_OSD_FLAG_WRITE)
1038                 req->r_data_offset = off;
1039
1040         r = ceph_osdc_alloc_messages(req, GFP_NOFS);
1041         if (r)
1042                 goto fail;
1043
1044         return req;
1045
1046 fail:
1047         ceph_osdc_put_request(req);
1048         return ERR_PTR(r);
1049 }
1050 EXPORT_SYMBOL(ceph_osdc_new_request);
1051
1052 /*
1053  * We keep osd requests in an rbtree, sorted by ->r_tid.
1054  */
1055 DEFINE_RB_FUNCS(request, struct ceph_osd_request, r_tid, r_node)
1056 DEFINE_RB_FUNCS(request_mc, struct ceph_osd_request, r_tid, r_mc_node)
1057
1058 static bool osd_homeless(struct ceph_osd *osd)
1059 {
1060         return osd->o_osd == CEPH_HOMELESS_OSD;
1061 }
1062
1063 static bool osd_registered(struct ceph_osd *osd)
1064 {
1065         verify_osdc_locked(osd->o_osdc);
1066
1067         return !RB_EMPTY_NODE(&osd->o_node);
1068 }
1069
1070 /*
1071  * Assumes @osd is zero-initialized.
1072  */
1073 static void osd_init(struct ceph_osd *osd)
1074 {
1075         refcount_set(&osd->o_ref, 1);
1076         RB_CLEAR_NODE(&osd->o_node);
1077         osd->o_requests = RB_ROOT;
1078         osd->o_linger_requests = RB_ROOT;
1079         osd->o_backoff_mappings = RB_ROOT;
1080         osd->o_backoffs_by_id = RB_ROOT;
1081         INIT_LIST_HEAD(&osd->o_osd_lru);
1082         INIT_LIST_HEAD(&osd->o_keepalive_item);
1083         osd->o_incarnation = 1;
1084         mutex_init(&osd->lock);
1085 }
1086
1087 static void osd_cleanup(struct ceph_osd *osd)
1088 {
1089         WARN_ON(!RB_EMPTY_NODE(&osd->o_node));
1090         WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
1091         WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
1092         WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoff_mappings));
1093         WARN_ON(!RB_EMPTY_ROOT(&osd->o_backoffs_by_id));
1094         WARN_ON(!list_empty(&osd->o_osd_lru));
1095         WARN_ON(!list_empty(&osd->o_keepalive_item));
1096
1097         if (osd->o_auth.authorizer) {
1098                 WARN_ON(osd_homeless(osd));
1099                 ceph_auth_destroy_authorizer(osd->o_auth.authorizer);
1100         }
1101 }
1102
1103 /*
1104  * Track open sessions with osds.
1105  */
1106 static struct ceph_osd *create_osd(struct ceph_osd_client *osdc, int onum)
1107 {
1108         struct ceph_osd *osd;
1109
1110         WARN_ON(onum == CEPH_HOMELESS_OSD);
1111
1112         osd = kzalloc(sizeof(*osd), GFP_NOIO | __GFP_NOFAIL);
1113         osd_init(osd);
1114         osd->o_osdc = osdc;
1115         osd->o_osd = onum;
1116
1117         ceph_con_init(&osd->o_con, osd, &osd_con_ops, &osdc->client->msgr);
1118
1119         return osd;
1120 }
1121
1122 static struct ceph_osd *get_osd(struct ceph_osd *osd)
1123 {
1124         if (refcount_inc_not_zero(&osd->o_ref)) {
1125                 dout("get_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref)-1,
1126                      refcount_read(&osd->o_ref));
1127                 return osd;
1128         } else {
1129                 dout("get_osd %p FAIL\n", osd);
1130                 return NULL;
1131         }
1132 }
1133
1134 static void put_osd(struct ceph_osd *osd)
1135 {
1136         dout("put_osd %p %d -> %d\n", osd, refcount_read(&osd->o_ref),
1137              refcount_read(&osd->o_ref) - 1);
1138         if (refcount_dec_and_test(&osd->o_ref)) {
1139                 osd_cleanup(osd);
1140                 kfree(osd);
1141         }
1142 }
1143
1144 DEFINE_RB_FUNCS(osd, struct ceph_osd, o_osd, o_node)
1145
1146 static void __move_osd_to_lru(struct ceph_osd *osd)
1147 {
1148         struct ceph_osd_client *osdc = osd->o_osdc;
1149
1150         dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1151         BUG_ON(!list_empty(&osd->o_osd_lru));
1152
1153         spin_lock(&osdc->osd_lru_lock);
1154         list_add_tail(&osd->o_osd_lru, &osdc->osd_lru);
1155         spin_unlock(&osdc->osd_lru_lock);
1156
1157         osd->lru_ttl = jiffies + osdc->client->options->osd_idle_ttl;
1158 }
1159
1160 static void maybe_move_osd_to_lru(struct ceph_osd *osd)
1161 {
1162         if (RB_EMPTY_ROOT(&osd->o_requests) &&
1163             RB_EMPTY_ROOT(&osd->o_linger_requests))
1164                 __move_osd_to_lru(osd);
1165 }
1166
1167 static void __remove_osd_from_lru(struct ceph_osd *osd)
1168 {
1169         struct ceph_osd_client *osdc = osd->o_osdc;
1170
1171         dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1172
1173         spin_lock(&osdc->osd_lru_lock);
1174         if (!list_empty(&osd->o_osd_lru))
1175                 list_del_init(&osd->o_osd_lru);
1176         spin_unlock(&osdc->osd_lru_lock);
1177 }
1178
1179 /*
1180  * Close the connection and assign any leftover requests to the
1181  * homeless session.
1182  */
1183 static void close_osd(struct ceph_osd *osd)
1184 {
1185         struct ceph_osd_client *osdc = osd->o_osdc;
1186         struct rb_node *n;
1187
1188         verify_osdc_wrlocked(osdc);
1189         dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1190
1191         ceph_con_close(&osd->o_con);
1192
1193         for (n = rb_first(&osd->o_requests); n; ) {
1194                 struct ceph_osd_request *req =
1195                     rb_entry(n, struct ceph_osd_request, r_node);
1196
1197                 n = rb_next(n); /* unlink_request() */
1198
1199                 dout(" reassigning req %p tid %llu\n", req, req->r_tid);
1200                 unlink_request(osd, req);
1201                 link_request(&osdc->homeless_osd, req);
1202         }
1203         for (n = rb_first(&osd->o_linger_requests); n; ) {
1204                 struct ceph_osd_linger_request *lreq =
1205                     rb_entry(n, struct ceph_osd_linger_request, node);
1206
1207                 n = rb_next(n); /* unlink_linger() */
1208
1209                 dout(" reassigning lreq %p linger_id %llu\n", lreq,
1210                      lreq->linger_id);
1211                 unlink_linger(osd, lreq);
1212                 link_linger(&osdc->homeless_osd, lreq);
1213         }
1214         clear_backoffs(osd);
1215
1216         __remove_osd_from_lru(osd);
1217         erase_osd(&osdc->osds, osd);
1218         put_osd(osd);
1219 }
1220
1221 /*
1222  * reset osd connect
1223  */
1224 static int reopen_osd(struct ceph_osd *osd)
1225 {
1226         struct ceph_entity_addr *peer_addr;
1227
1228         dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
1229
1230         if (RB_EMPTY_ROOT(&osd->o_requests) &&
1231             RB_EMPTY_ROOT(&osd->o_linger_requests)) {
1232                 close_osd(osd);
1233                 return -ENODEV;
1234         }
1235
1236         peer_addr = &osd->o_osdc->osdmap->osd_addr[osd->o_osd];
1237         if (!memcmp(peer_addr, &osd->o_con.peer_addr, sizeof (*peer_addr)) &&
1238                         !ceph_con_opened(&osd->o_con)) {
1239                 struct rb_node *n;
1240
1241                 dout("osd addr hasn't changed and connection never opened, "
1242                      "letting msgr retry\n");
1243                 /* touch each r_stamp for handle_timeout()'s benfit */
1244                 for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
1245                         struct ceph_osd_request *req =
1246                             rb_entry(n, struct ceph_osd_request, r_node);
1247                         req->r_stamp = jiffies;
1248                 }
1249
1250                 return -EAGAIN;
1251         }
1252
1253         ceph_con_close(&osd->o_con);
1254         ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd, peer_addr);
1255         osd->o_incarnation++;
1256
1257         return 0;
1258 }
1259
1260 static struct ceph_osd *lookup_create_osd(struct ceph_osd_client *osdc, int o,
1261                                           bool wrlocked)
1262 {
1263         struct ceph_osd *osd;
1264
1265         if (wrlocked)
1266                 verify_osdc_wrlocked(osdc);
1267         else
1268                 verify_osdc_locked(osdc);
1269
1270         if (o != CEPH_HOMELESS_OSD)
1271                 osd = lookup_osd(&osdc->osds, o);
1272         else
1273                 osd = &osdc->homeless_osd;
1274         if (!osd) {
1275                 if (!wrlocked)
1276                         return ERR_PTR(-EAGAIN);
1277
1278                 osd = create_osd(osdc, o);
1279                 insert_osd(&osdc->osds, osd);
1280                 ceph_con_open(&osd->o_con, CEPH_ENTITY_TYPE_OSD, osd->o_osd,
1281                               &osdc->osdmap->osd_addr[osd->o_osd]);
1282         }
1283
1284         dout("%s osdc %p osd%d -> osd %p\n", __func__, osdc, o, osd);
1285         return osd;
1286 }
1287
1288 /*
1289  * Create request <-> OSD session relation.
1290  *
1291  * @req has to be assigned a tid, @osd may be homeless.
1292  */
1293 static void link_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1294 {
1295         verify_osd_locked(osd);
1296         WARN_ON(!req->r_tid || req->r_osd);
1297         dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1298              req, req->r_tid);
1299
1300         if (!osd_homeless(osd))
1301                 __remove_osd_from_lru(osd);
1302         else
1303                 atomic_inc(&osd->o_osdc->num_homeless);
1304
1305         get_osd(osd);
1306         insert_request(&osd->o_requests, req);
1307         req->r_osd = osd;
1308 }
1309
1310 static void unlink_request(struct ceph_osd *osd, struct ceph_osd_request *req)
1311 {
1312         verify_osd_locked(osd);
1313         WARN_ON(req->r_osd != osd);
1314         dout("%s osd %p osd%d req %p tid %llu\n", __func__, osd, osd->o_osd,
1315              req, req->r_tid);
1316
1317         req->r_osd = NULL;
1318         erase_request(&osd->o_requests, req);
1319         put_osd(osd);
1320
1321         if (!osd_homeless(osd))
1322                 maybe_move_osd_to_lru(osd);
1323         else
1324                 atomic_dec(&osd->o_osdc->num_homeless);
1325 }
1326
1327 static bool __pool_full(struct ceph_pg_pool_info *pi)
1328 {
1329         return pi->flags & CEPH_POOL_FLAG_FULL;
1330 }
1331
1332 static bool have_pool_full(struct ceph_osd_client *osdc)
1333 {
1334         struct rb_node *n;
1335
1336         for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
1337                 struct ceph_pg_pool_info *pi =
1338                     rb_entry(n, struct ceph_pg_pool_info, node);
1339
1340                 if (__pool_full(pi))
1341                         return true;
1342         }
1343
1344         return false;
1345 }
1346
1347 static bool pool_full(struct ceph_osd_client *osdc, s64 pool_id)
1348 {
1349         struct ceph_pg_pool_info *pi;
1350
1351         pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
1352         if (!pi)
1353                 return false;
1354
1355         return __pool_full(pi);
1356 }
1357
1358 /*
1359  * Returns whether a request should be blocked from being sent
1360  * based on the current osdmap and osd_client settings.
1361  */
1362 static bool target_should_be_paused(struct ceph_osd_client *osdc,
1363                                     const struct ceph_osd_request_target *t,
1364                                     struct ceph_pg_pool_info *pi)
1365 {
1366         bool pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
1367         bool pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
1368                        ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
1369                        __pool_full(pi);
1370
1371         WARN_ON(pi->id != t->target_oloc.pool);
1372         return ((t->flags & CEPH_OSD_FLAG_READ) && pauserd) ||
1373                ((t->flags & CEPH_OSD_FLAG_WRITE) && pausewr) ||
1374                (osdc->osdmap->epoch < osdc->epoch_barrier);
1375 }
1376
1377 enum calc_target_result {
1378         CALC_TARGET_NO_ACTION = 0,
1379         CALC_TARGET_NEED_RESEND,
1380         CALC_TARGET_POOL_DNE,
1381 };
1382
1383 static enum calc_target_result calc_target(struct ceph_osd_client *osdc,
1384                                            struct ceph_osd_request_target *t,
1385                                            struct ceph_connection *con,
1386                                            bool any_change)
1387 {
1388         struct ceph_pg_pool_info *pi;
1389         struct ceph_pg pgid, last_pgid;
1390         struct ceph_osds up, acting;
1391         bool force_resend = false;
1392         bool unpaused = false;
1393         bool legacy_change;
1394         bool split = false;
1395         bool sort_bitwise = ceph_osdmap_flag(osdc, CEPH_OSDMAP_SORTBITWISE);
1396         bool recovery_deletes = ceph_osdmap_flag(osdc,
1397                                                  CEPH_OSDMAP_RECOVERY_DELETES);
1398         enum calc_target_result ct_res;
1399         int ret;
1400
1401         t->epoch = osdc->osdmap->epoch;
1402         pi = ceph_pg_pool_by_id(osdc->osdmap, t->base_oloc.pool);
1403         if (!pi) {
1404                 t->osd = CEPH_HOMELESS_OSD;
1405                 ct_res = CALC_TARGET_POOL_DNE;
1406                 goto out;
1407         }
1408
1409         if (osdc->osdmap->epoch == pi->last_force_request_resend) {
1410                 if (t->last_force_resend < pi->last_force_request_resend) {
1411                         t->last_force_resend = pi->last_force_request_resend;
1412                         force_resend = true;
1413                 } else if (t->last_force_resend == 0) {
1414                         force_resend = true;
1415                 }
1416         }
1417
1418         /* apply tiering */
1419         ceph_oid_copy(&t->target_oid, &t->base_oid);
1420         ceph_oloc_copy(&t->target_oloc, &t->base_oloc);
1421         if ((t->flags & CEPH_OSD_FLAG_IGNORE_OVERLAY) == 0) {
1422                 if (t->flags & CEPH_OSD_FLAG_READ && pi->read_tier >= 0)
1423                         t->target_oloc.pool = pi->read_tier;
1424                 if (t->flags & CEPH_OSD_FLAG_WRITE && pi->write_tier >= 0)
1425                         t->target_oloc.pool = pi->write_tier;
1426
1427                 pi = ceph_pg_pool_by_id(osdc->osdmap, t->target_oloc.pool);
1428                 if (!pi) {
1429                         t->osd = CEPH_HOMELESS_OSD;
1430                         ct_res = CALC_TARGET_POOL_DNE;
1431                         goto out;
1432                 }
1433         }
1434
1435         ret = __ceph_object_locator_to_pg(pi, &t->target_oid, &t->target_oloc,
1436                                           &pgid);
1437         if (ret) {
1438                 WARN_ON(ret != -ENOENT);
1439                 t->osd = CEPH_HOMELESS_OSD;
1440                 ct_res = CALC_TARGET_POOL_DNE;
1441                 goto out;
1442         }
1443         last_pgid.pool = pgid.pool;
1444         last_pgid.seed = ceph_stable_mod(pgid.seed, t->pg_num, t->pg_num_mask);
1445
1446         ceph_pg_to_up_acting_osds(osdc->osdmap, pi, &pgid, &up, &acting);
1447         if (any_change &&
1448             ceph_is_new_interval(&t->acting,
1449                                  &acting,
1450                                  &t->up,
1451                                  &up,
1452                                  t->size,
1453                                  pi->size,
1454                                  t->min_size,
1455                                  pi->min_size,
1456                                  t->pg_num,
1457                                  pi->pg_num,
1458                                  t->sort_bitwise,
1459                                  sort_bitwise,
1460                                  t->recovery_deletes,
1461                                  recovery_deletes,
1462                                  &last_pgid))
1463                 force_resend = true;
1464
1465         if (t->paused && !target_should_be_paused(osdc, t, pi)) {
1466                 t->paused = false;
1467                 unpaused = true;
1468         }
1469         legacy_change = ceph_pg_compare(&t->pgid, &pgid) ||
1470                         ceph_osds_changed(&t->acting, &acting, any_change);
1471         if (t->pg_num)
1472                 split = ceph_pg_is_split(&last_pgid, t->pg_num, pi->pg_num);
1473
1474         if (legacy_change || force_resend || split) {
1475                 t->pgid = pgid; /* struct */
1476                 ceph_pg_to_primary_shard(osdc->osdmap, pi, &pgid, &t->spgid);
1477                 ceph_osds_copy(&t->acting, &acting);
1478                 ceph_osds_copy(&t->up, &up);
1479                 t->size = pi->size;
1480                 t->min_size = pi->min_size;
1481                 t->pg_num = pi->pg_num;
1482                 t->pg_num_mask = pi->pg_num_mask;
1483                 t->sort_bitwise = sort_bitwise;
1484                 t->recovery_deletes = recovery_deletes;
1485
1486                 t->osd = acting.primary;
1487         }
1488
1489         if (unpaused || legacy_change || force_resend ||
1490             (split && con && CEPH_HAVE_FEATURE(con->peer_features,
1491                                                RESEND_ON_SPLIT)))
1492                 ct_res = CALC_TARGET_NEED_RESEND;
1493         else
1494                 ct_res = CALC_TARGET_NO_ACTION;
1495
1496 out:
1497         dout("%s t %p -> ct_res %d osd %d\n", __func__, t, ct_res, t->osd);
1498         return ct_res;
1499 }
1500
1501 static struct ceph_spg_mapping *alloc_spg_mapping(void)
1502 {
1503         struct ceph_spg_mapping *spg;
1504
1505         spg = kmalloc(sizeof(*spg), GFP_NOIO);
1506         if (!spg)
1507                 return NULL;
1508
1509         RB_CLEAR_NODE(&spg->node);
1510         spg->backoffs = RB_ROOT;
1511         return spg;
1512 }
1513
1514 static void free_spg_mapping(struct ceph_spg_mapping *spg)
1515 {
1516         WARN_ON(!RB_EMPTY_NODE(&spg->node));
1517         WARN_ON(!RB_EMPTY_ROOT(&spg->backoffs));
1518
1519         kfree(spg);
1520 }
1521
1522 /*
1523  * rbtree of ceph_spg_mapping for handling map<spg_t, ...>, similar to
1524  * ceph_pg_mapping.  Used to track OSD backoffs -- a backoff [range] is
1525  * defined only within a specific spgid; it does not pass anything to
1526  * children on split, or to another primary.
1527  */
1528 DEFINE_RB_FUNCS2(spg_mapping, struct ceph_spg_mapping, spgid, ceph_spg_compare,
1529                  RB_BYPTR, const struct ceph_spg *, node)
1530
1531 static u64 hoid_get_bitwise_key(const struct ceph_hobject_id *hoid)
1532 {
1533         return hoid->is_max ? 0x100000000ull : hoid->hash_reverse_bits;
1534 }
1535
1536 static void hoid_get_effective_key(const struct ceph_hobject_id *hoid,
1537                                    void **pkey, size_t *pkey_len)
1538 {
1539         if (hoid->key_len) {
1540                 *pkey = hoid->key;
1541                 *pkey_len = hoid->key_len;
1542         } else {
1543                 *pkey = hoid->oid;
1544                 *pkey_len = hoid->oid_len;
1545         }
1546 }
1547
1548 static int compare_names(const void *name1, size_t name1_len,
1549                          const void *name2, size_t name2_len)
1550 {
1551         int ret;
1552
1553         ret = memcmp(name1, name2, min(name1_len, name2_len));
1554         if (!ret) {
1555                 if (name1_len < name2_len)
1556                         ret = -1;
1557                 else if (name1_len > name2_len)
1558                         ret = 1;
1559         }
1560         return ret;
1561 }
1562
1563 static int hoid_compare(const struct ceph_hobject_id *lhs,
1564                         const struct ceph_hobject_id *rhs)
1565 {
1566         void *effective_key1, *effective_key2;
1567         size_t effective_key1_len, effective_key2_len;
1568         int ret;
1569
1570         if (lhs->is_max < rhs->is_max)
1571                 return -1;
1572         if (lhs->is_max > rhs->is_max)
1573                 return 1;
1574
1575         if (lhs->pool < rhs->pool)
1576                 return -1;
1577         if (lhs->pool > rhs->pool)
1578                 return 1;
1579
1580         if (hoid_get_bitwise_key(lhs) < hoid_get_bitwise_key(rhs))
1581                 return -1;
1582         if (hoid_get_bitwise_key(lhs) > hoid_get_bitwise_key(rhs))
1583                 return 1;
1584
1585         ret = compare_names(lhs->nspace, lhs->nspace_len,
1586                             rhs->nspace, rhs->nspace_len);
1587         if (ret)
1588                 return ret;
1589
1590         hoid_get_effective_key(lhs, &effective_key1, &effective_key1_len);
1591         hoid_get_effective_key(rhs, &effective_key2, &effective_key2_len);
1592         ret = compare_names(effective_key1, effective_key1_len,
1593                             effective_key2, effective_key2_len);
1594         if (ret)
1595                 return ret;
1596
1597         ret = compare_names(lhs->oid, lhs->oid_len, rhs->oid, rhs->oid_len);
1598         if (ret)
1599                 return ret;
1600
1601         if (lhs->snapid < rhs->snapid)
1602                 return -1;
1603         if (lhs->snapid > rhs->snapid)
1604                 return 1;
1605
1606         return 0;
1607 }
1608
1609 /*
1610  * For decoding ->begin and ->end of MOSDBackoff only -- no MIN/MAX
1611  * compat stuff here.
1612  *
1613  * Assumes @hoid is zero-initialized.
1614  */
1615 static int decode_hoid(void **p, void *end, struct ceph_hobject_id *hoid)
1616 {
1617         u8 struct_v;
1618         u32 struct_len;
1619         int ret;
1620
1621         ret = ceph_start_decoding(p, end, 4, "hobject_t", &struct_v,
1622                                   &struct_len);
1623         if (ret)
1624                 return ret;
1625
1626         if (struct_v < 4) {
1627                 pr_err("got struct_v %d < 4 of hobject_t\n", struct_v);
1628                 goto e_inval;
1629         }
1630
1631         hoid->key = ceph_extract_encoded_string(p, end, &hoid->key_len,
1632                                                 GFP_NOIO);
1633         if (IS_ERR(hoid->key)) {
1634                 ret = PTR_ERR(hoid->key);
1635                 hoid->key = NULL;
1636                 return ret;
1637         }
1638
1639         hoid->oid = ceph_extract_encoded_string(p, end, &hoid->oid_len,
1640                                                 GFP_NOIO);
1641         if (IS_ERR(hoid->oid)) {
1642                 ret = PTR_ERR(hoid->oid);
1643                 hoid->oid = NULL;
1644                 return ret;
1645         }
1646
1647         ceph_decode_64_safe(p, end, hoid->snapid, e_inval);
1648         ceph_decode_32_safe(p, end, hoid->hash, e_inval);
1649         ceph_decode_8_safe(p, end, hoid->is_max, e_inval);
1650
1651         hoid->nspace = ceph_extract_encoded_string(p, end, &hoid->nspace_len,
1652                                                    GFP_NOIO);
1653         if (IS_ERR(hoid->nspace)) {
1654                 ret = PTR_ERR(hoid->nspace);
1655                 hoid->nspace = NULL;
1656                 return ret;
1657         }
1658
1659         ceph_decode_64_safe(p, end, hoid->pool, e_inval);
1660
1661         ceph_hoid_build_hash_cache(hoid);
1662         return 0;
1663
1664 e_inval:
1665         return -EINVAL;
1666 }
1667
1668 static int hoid_encoding_size(const struct ceph_hobject_id *hoid)
1669 {
1670         return 8 + 4 + 1 + 8 + /* snapid, hash, is_max, pool */
1671                4 + hoid->key_len + 4 + hoid->oid_len + 4 + hoid->nspace_len;
1672 }
1673
1674 static void encode_hoid(void **p, void *end, const struct ceph_hobject_id *hoid)
1675 {
1676         ceph_start_encoding(p, 4, 3, hoid_encoding_size(hoid));
1677         ceph_encode_string(p, end, hoid->key, hoid->key_len);
1678         ceph_encode_string(p, end, hoid->oid, hoid->oid_len);
1679         ceph_encode_64(p, hoid->snapid);
1680         ceph_encode_32(p, hoid->hash);
1681         ceph_encode_8(p, hoid->is_max);
1682         ceph_encode_string(p, end, hoid->nspace, hoid->nspace_len);
1683         ceph_encode_64(p, hoid->pool);
1684 }
1685
1686 static void free_hoid(struct ceph_hobject_id *hoid)
1687 {
1688         if (hoid) {
1689                 kfree(hoid->key);
1690                 kfree(hoid->oid);
1691                 kfree(hoid->nspace);
1692                 kfree(hoid);
1693         }
1694 }
1695
1696 static struct ceph_osd_backoff *alloc_backoff(void)
1697 {
1698         struct ceph_osd_backoff *backoff;
1699
1700         backoff = kzalloc(sizeof(*backoff), GFP_NOIO);
1701         if (!backoff)
1702                 return NULL;
1703
1704         RB_CLEAR_NODE(&backoff->spg_node);
1705         RB_CLEAR_NODE(&backoff->id_node);
1706         return backoff;
1707 }
1708
1709 static void free_backoff(struct ceph_osd_backoff *backoff)
1710 {
1711         WARN_ON(!RB_EMPTY_NODE(&backoff->spg_node));
1712         WARN_ON(!RB_EMPTY_NODE(&backoff->id_node));
1713
1714         free_hoid(backoff->begin);
1715         free_hoid(backoff->end);
1716         kfree(backoff);
1717 }
1718
1719 /*
1720  * Within a specific spgid, backoffs are managed by ->begin hoid.
1721  */
1722 DEFINE_RB_INSDEL_FUNCS2(backoff, struct ceph_osd_backoff, begin, hoid_compare,
1723                         RB_BYVAL, spg_node);
1724
1725 static struct ceph_osd_backoff *lookup_containing_backoff(struct rb_root *root,
1726                                             const struct ceph_hobject_id *hoid)
1727 {
1728         struct rb_node *n = root->rb_node;
1729
1730         while (n) {
1731                 struct ceph_osd_backoff *cur =
1732                     rb_entry(n, struct ceph_osd_backoff, spg_node);
1733                 int cmp;
1734
1735                 cmp = hoid_compare(hoid, cur->begin);
1736                 if (cmp < 0) {
1737                         n = n->rb_left;
1738                 } else if (cmp > 0) {
1739                         if (hoid_compare(hoid, cur->end) < 0)
1740                                 return cur;
1741
1742                         n = n->rb_right;
1743                 } else {
1744                         return cur;
1745                 }
1746         }
1747
1748         return NULL;
1749 }
1750
1751 /*
1752  * Each backoff has a unique id within its OSD session.
1753  */
1754 DEFINE_RB_FUNCS(backoff_by_id, struct ceph_osd_backoff, id, id_node)
1755
1756 static void clear_backoffs(struct ceph_osd *osd)
1757 {
1758         while (!RB_EMPTY_ROOT(&osd->o_backoff_mappings)) {
1759                 struct ceph_spg_mapping *spg =
1760                     rb_entry(rb_first(&osd->o_backoff_mappings),
1761                              struct ceph_spg_mapping, node);
1762
1763                 while (!RB_EMPTY_ROOT(&spg->backoffs)) {
1764                         struct ceph_osd_backoff *backoff =
1765                             rb_entry(rb_first(&spg->backoffs),
1766                                      struct ceph_osd_backoff, spg_node);
1767
1768                         erase_backoff(&spg->backoffs, backoff);
1769                         erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
1770                         free_backoff(backoff);
1771                 }
1772                 erase_spg_mapping(&osd->o_backoff_mappings, spg);
1773                 free_spg_mapping(spg);
1774         }
1775 }
1776
1777 /*
1778  * Set up a temporary, non-owning view into @t.
1779  */
1780 static void hoid_fill_from_target(struct ceph_hobject_id *hoid,
1781                                   const struct ceph_osd_request_target *t)
1782 {
1783         hoid->key = NULL;
1784         hoid->key_len = 0;
1785         hoid->oid = t->target_oid.name;
1786         hoid->oid_len = t->target_oid.name_len;
1787         hoid->snapid = CEPH_NOSNAP;
1788         hoid->hash = t->pgid.seed;
1789         hoid->is_max = false;
1790         if (t->target_oloc.pool_ns) {
1791                 hoid->nspace = t->target_oloc.pool_ns->str;
1792                 hoid->nspace_len = t->target_oloc.pool_ns->len;
1793         } else {
1794                 hoid->nspace = NULL;
1795                 hoid->nspace_len = 0;
1796         }
1797         hoid->pool = t->target_oloc.pool;
1798         ceph_hoid_build_hash_cache(hoid);
1799 }
1800
1801 static bool should_plug_request(struct ceph_osd_request *req)
1802 {
1803         struct ceph_osd *osd = req->r_osd;
1804         struct ceph_spg_mapping *spg;
1805         struct ceph_osd_backoff *backoff;
1806         struct ceph_hobject_id hoid;
1807
1808         spg = lookup_spg_mapping(&osd->o_backoff_mappings, &req->r_t.spgid);
1809         if (!spg)
1810                 return false;
1811
1812         hoid_fill_from_target(&hoid, &req->r_t);
1813         backoff = lookup_containing_backoff(&spg->backoffs, &hoid);
1814         if (!backoff)
1815                 return false;
1816
1817         dout("%s req %p tid %llu backoff osd%d spgid %llu.%xs%d id %llu\n",
1818              __func__, req, req->r_tid, osd->o_osd, backoff->spgid.pgid.pool,
1819              backoff->spgid.pgid.seed, backoff->spgid.shard, backoff->id);
1820         return true;
1821 }
1822
1823 static void setup_request_data(struct ceph_osd_request *req,
1824                                struct ceph_msg *msg)
1825 {
1826         u32 data_len = 0;
1827         int i;
1828
1829         if (!list_empty(&msg->data))
1830                 return;
1831
1832         WARN_ON(msg->data_length);
1833         for (i = 0; i < req->r_num_ops; i++) {
1834                 struct ceph_osd_req_op *op = &req->r_ops[i];
1835
1836                 switch (op->op) {
1837                 /* request */
1838                 case CEPH_OSD_OP_WRITE:
1839                 case CEPH_OSD_OP_WRITEFULL:
1840                         WARN_ON(op->indata_len != op->extent.length);
1841                         ceph_osdc_msg_data_add(msg, &op->extent.osd_data);
1842                         break;
1843                 case CEPH_OSD_OP_SETXATTR:
1844                 case CEPH_OSD_OP_CMPXATTR:
1845                         WARN_ON(op->indata_len != op->xattr.name_len +
1846                                                   op->xattr.value_len);
1847                         ceph_osdc_msg_data_add(msg, &op->xattr.osd_data);
1848                         break;
1849                 case CEPH_OSD_OP_NOTIFY_ACK:
1850                         ceph_osdc_msg_data_add(msg,
1851                                                &op->notify_ack.request_data);
1852                         break;
1853
1854                 /* reply */
1855                 case CEPH_OSD_OP_STAT:
1856                         ceph_osdc_msg_data_add(req->r_reply,
1857                                                &op->raw_data_in);
1858                         break;
1859                 case CEPH_OSD_OP_READ:
1860                         ceph_osdc_msg_data_add(req->r_reply,
1861                                                &op->extent.osd_data);
1862                         break;
1863                 case CEPH_OSD_OP_LIST_WATCHERS:
1864                         ceph_osdc_msg_data_add(req->r_reply,
1865                                                &op->list_watchers.response_data);
1866                         break;
1867
1868                 /* both */
1869                 case CEPH_OSD_OP_CALL:
1870                         WARN_ON(op->indata_len != op->cls.class_len +
1871                                                   op->cls.method_len +
1872                                                   op->cls.indata_len);
1873                         ceph_osdc_msg_data_add(msg, &op->cls.request_info);
1874                         /* optional, can be NONE */
1875                         ceph_osdc_msg_data_add(msg, &op->cls.request_data);
1876                         /* optional, can be NONE */
1877                         ceph_osdc_msg_data_add(req->r_reply,
1878                                                &op->cls.response_data);
1879                         break;
1880                 case CEPH_OSD_OP_NOTIFY:
1881                         ceph_osdc_msg_data_add(msg,
1882                                                &op->notify.request_data);
1883                         ceph_osdc_msg_data_add(req->r_reply,
1884                                                &op->notify.response_data);
1885                         break;
1886                 }
1887
1888                 data_len += op->indata_len;
1889         }
1890
1891         WARN_ON(data_len != msg->data_length);
1892 }
1893
1894 static void encode_pgid(void **p, const struct ceph_pg *pgid)
1895 {
1896         ceph_encode_8(p, 1);
1897         ceph_encode_64(p, pgid->pool);
1898         ceph_encode_32(p, pgid->seed);
1899         ceph_encode_32(p, -1); /* preferred */
1900 }
1901
1902 static void encode_spgid(void **p, const struct ceph_spg *spgid)
1903 {
1904         ceph_start_encoding(p, 1, 1, CEPH_PGID_ENCODING_LEN + 1);
1905         encode_pgid(p, &spgid->pgid);
1906         ceph_encode_8(p, spgid->shard);
1907 }
1908
1909 static void encode_oloc(void **p, void *end,
1910                         const struct ceph_object_locator *oloc)
1911 {
1912         ceph_start_encoding(p, 5, 4, ceph_oloc_encoding_size(oloc));
1913         ceph_encode_64(p, oloc->pool);
1914         ceph_encode_32(p, -1); /* preferred */
1915         ceph_encode_32(p, 0);  /* key len */
1916         if (oloc->pool_ns)
1917                 ceph_encode_string(p, end, oloc->pool_ns->str,
1918                                    oloc->pool_ns->len);
1919         else
1920                 ceph_encode_32(p, 0);
1921 }
1922
1923 static void encode_request_partial(struct ceph_osd_request *req,
1924                                    struct ceph_msg *msg)
1925 {
1926         void *p = msg->front.iov_base;
1927         void *const end = p + msg->front_alloc_len;
1928         u32 data_len = 0;
1929         int i;
1930
1931         if (req->r_flags & CEPH_OSD_FLAG_WRITE) {
1932                 /* snapshots aren't writeable */
1933                 WARN_ON(req->r_snapid != CEPH_NOSNAP);
1934         } else {
1935                 WARN_ON(req->r_mtime.tv_sec || req->r_mtime.tv_nsec ||
1936                         req->r_data_offset || req->r_snapc);
1937         }
1938
1939         setup_request_data(req, msg);
1940
1941         encode_spgid(&p, &req->r_t.spgid); /* actual spg */
1942         ceph_encode_32(&p, req->r_t.pgid.seed); /* raw hash */
1943         ceph_encode_32(&p, req->r_osdc->osdmap->epoch);
1944         ceph_encode_32(&p, req->r_flags);
1945
1946         /* reqid */
1947         ceph_start_encoding(&p, 2, 2, sizeof(struct ceph_osd_reqid));
1948         memset(p, 0, sizeof(struct ceph_osd_reqid));
1949         p += sizeof(struct ceph_osd_reqid);
1950
1951         /* trace */
1952         memset(p, 0, sizeof(struct ceph_blkin_trace_info));
1953         p += sizeof(struct ceph_blkin_trace_info);
1954
1955         ceph_encode_32(&p, 0); /* client_inc, always 0 */
1956         ceph_encode_timespec(p, &req->r_mtime);
1957         p += sizeof(struct ceph_timespec);
1958
1959         encode_oloc(&p, end, &req->r_t.target_oloc);
1960         ceph_encode_string(&p, end, req->r_t.target_oid.name,
1961                            req->r_t.target_oid.name_len);
1962
1963         /* ops, can imply data */
1964         ceph_encode_16(&p, req->r_num_ops);
1965         for (i = 0; i < req->r_num_ops; i++) {
1966                 data_len += osd_req_encode_op(p, &req->r_ops[i]);
1967                 p += sizeof(struct ceph_osd_op);
1968         }
1969
1970         ceph_encode_64(&p, req->r_snapid); /* snapid */
1971         if (req->r_snapc) {
1972                 ceph_encode_64(&p, req->r_snapc->seq);
1973                 ceph_encode_32(&p, req->r_snapc->num_snaps);
1974                 for (i = 0; i < req->r_snapc->num_snaps; i++)
1975                         ceph_encode_64(&p, req->r_snapc->snaps[i]);
1976         } else {
1977                 ceph_encode_64(&p, 0); /* snap_seq */
1978                 ceph_encode_32(&p, 0); /* snaps len */
1979         }
1980
1981         ceph_encode_32(&p, req->r_attempts); /* retry_attempt */
1982         BUG_ON(p > end - 8); /* space for features */
1983
1984         msg->hdr.version = cpu_to_le16(8); /* MOSDOp v8 */
1985         /* front_len is finalized in encode_request_finish() */
1986         msg->front.iov_len = p - msg->front.iov_base;
1987         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
1988         msg->hdr.data_len = cpu_to_le32(data_len);
1989         /*
1990          * The header "data_off" is a hint to the receiver allowing it
1991          * to align received data into its buffers such that there's no
1992          * need to re-copy it before writing it to disk (direct I/O).
1993          */
1994         msg->hdr.data_off = cpu_to_le16(req->r_data_offset);
1995
1996         dout("%s req %p msg %p oid %s oid_len %d\n", __func__, req, msg,
1997              req->r_t.target_oid.name, req->r_t.target_oid.name_len);
1998 }
1999
2000 static void encode_request_finish(struct ceph_msg *msg)
2001 {
2002         void *p = msg->front.iov_base;
2003         void *const partial_end = p + msg->front.iov_len;
2004         void *const end = p + msg->front_alloc_len;
2005
2006         if (CEPH_HAVE_FEATURE(msg->con->peer_features, RESEND_ON_SPLIT)) {
2007                 /* luminous OSD -- encode features and be done */
2008                 p = partial_end;
2009                 ceph_encode_64(&p, msg->con->peer_features);
2010         } else {
2011                 struct {
2012                         char spgid[CEPH_ENCODING_START_BLK_LEN +
2013                                    CEPH_PGID_ENCODING_LEN + 1];
2014                         __le32 hash;
2015                         __le32 epoch;
2016                         __le32 flags;
2017                         char reqid[CEPH_ENCODING_START_BLK_LEN +
2018                                    sizeof(struct ceph_osd_reqid)];
2019                         char trace[sizeof(struct ceph_blkin_trace_info)];
2020                         __le32 client_inc;
2021                         struct ceph_timespec mtime;
2022                 } __packed head;
2023                 struct ceph_pg pgid;
2024                 void *oloc, *oid, *tail;
2025                 int oloc_len, oid_len, tail_len;
2026                 int len;
2027
2028                 /*
2029                  * Pre-luminous OSD -- reencode v8 into v4 using @head
2030                  * as a temporary buffer.  Encode the raw PG; the rest
2031                  * is just a matter of moving oloc, oid and tail blobs
2032                  * around.
2033                  */
2034                 memcpy(&head, p, sizeof(head));
2035                 p += sizeof(head);
2036
2037                 oloc = p;
2038                 p += CEPH_ENCODING_START_BLK_LEN;
2039                 pgid.pool = ceph_decode_64(&p);
2040                 p += 4 + 4; /* preferred, key len */
2041                 len = ceph_decode_32(&p);
2042                 p += len;   /* nspace */
2043                 oloc_len = p - oloc;
2044
2045                 oid = p;
2046                 len = ceph_decode_32(&p);
2047                 p += len;
2048                 oid_len = p - oid;
2049
2050                 tail = p;
2051                 tail_len = partial_end - p;
2052
2053                 p = msg->front.iov_base;
2054                 ceph_encode_copy(&p, &head.client_inc, sizeof(head.client_inc));
2055                 ceph_encode_copy(&p, &head.epoch, sizeof(head.epoch));
2056                 ceph_encode_copy(&p, &head.flags, sizeof(head.flags));
2057                 ceph_encode_copy(&p, &head.mtime, sizeof(head.mtime));
2058
2059                 /* reassert_version */
2060                 memset(p, 0, sizeof(struct ceph_eversion));
2061                 p += sizeof(struct ceph_eversion);
2062
2063                 BUG_ON(p >= oloc);
2064                 memmove(p, oloc, oloc_len);
2065                 p += oloc_len;
2066
2067                 pgid.seed = le32_to_cpu(head.hash);
2068                 encode_pgid(&p, &pgid); /* raw pg */
2069
2070                 BUG_ON(p >= oid);
2071                 memmove(p, oid, oid_len);
2072                 p += oid_len;
2073
2074                 /* tail -- ops, snapid, snapc, retry_attempt */
2075                 BUG_ON(p >= tail);
2076                 memmove(p, tail, tail_len);
2077                 p += tail_len;
2078
2079                 msg->hdr.version = cpu_to_le16(4); /* MOSDOp v4 */
2080         }
2081
2082         BUG_ON(p > end);
2083         msg->front.iov_len = p - msg->front.iov_base;
2084         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
2085
2086         dout("%s msg %p tid %llu %u+%u+%u v%d\n", __func__, msg,
2087              le64_to_cpu(msg->hdr.tid), le32_to_cpu(msg->hdr.front_len),
2088              le32_to_cpu(msg->hdr.middle_len), le32_to_cpu(msg->hdr.data_len),
2089              le16_to_cpu(msg->hdr.version));
2090 }
2091
2092 /*
2093  * @req has to be assigned a tid and registered.
2094  */
2095 static void send_request(struct ceph_osd_request *req)
2096 {
2097         struct ceph_osd *osd = req->r_osd;
2098
2099         verify_osd_locked(osd);
2100         WARN_ON(osd->o_osd != req->r_t.osd);
2101
2102         /* backoff? */
2103         if (should_plug_request(req))
2104                 return;
2105
2106         /*
2107          * We may have a previously queued request message hanging
2108          * around.  Cancel it to avoid corrupting the msgr.
2109          */
2110         if (req->r_sent)
2111                 ceph_msg_revoke(req->r_request);
2112
2113         req->r_flags |= CEPH_OSD_FLAG_KNOWN_REDIR;
2114         if (req->r_attempts)
2115                 req->r_flags |= CEPH_OSD_FLAG_RETRY;
2116         else
2117                 WARN_ON(req->r_flags & CEPH_OSD_FLAG_RETRY);
2118
2119         encode_request_partial(req, req->r_request);
2120
2121         dout("%s req %p tid %llu to pgid %llu.%x spgid %llu.%xs%d osd%d e%u flags 0x%x attempt %d\n",
2122              __func__, req, req->r_tid, req->r_t.pgid.pool, req->r_t.pgid.seed,
2123              req->r_t.spgid.pgid.pool, req->r_t.spgid.pgid.seed,
2124              req->r_t.spgid.shard, osd->o_osd, req->r_t.epoch, req->r_flags,
2125              req->r_attempts);
2126
2127         req->r_t.paused = false;
2128         req->r_stamp = jiffies;
2129         req->r_attempts++;
2130
2131         req->r_sent = osd->o_incarnation;
2132         req->r_request->hdr.tid = cpu_to_le64(req->r_tid);
2133         ceph_con_send(&osd->o_con, ceph_msg_get(req->r_request));
2134 }
2135
2136 static void maybe_request_map(struct ceph_osd_client *osdc)
2137 {
2138         bool continuous = false;
2139
2140         verify_osdc_locked(osdc);
2141         WARN_ON(!osdc->osdmap->epoch);
2142
2143         if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2144             ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD) ||
2145             ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
2146                 dout("%s osdc %p continuous\n", __func__, osdc);
2147                 continuous = true;
2148         } else {
2149                 dout("%s osdc %p onetime\n", __func__, osdc);
2150         }
2151
2152         if (ceph_monc_want_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
2153                                osdc->osdmap->epoch + 1, continuous))
2154                 ceph_monc_renew_subs(&osdc->client->monc);
2155 }
2156
2157 static void complete_request(struct ceph_osd_request *req, int err);
2158 static void send_map_check(struct ceph_osd_request *req);
2159
2160 static void __submit_request(struct ceph_osd_request *req, bool wrlocked)
2161 {
2162         struct ceph_osd_client *osdc = req->r_osdc;
2163         struct ceph_osd *osd;
2164         enum calc_target_result ct_res;
2165         bool need_send = false;
2166         bool promoted = false;
2167         bool need_abort = false;
2168
2169         WARN_ON(req->r_tid);
2170         dout("%s req %p wrlocked %d\n", __func__, req, wrlocked);
2171
2172 again:
2173         ct_res = calc_target(osdc, &req->r_t, NULL, false);
2174         if (ct_res == CALC_TARGET_POOL_DNE && !wrlocked)
2175                 goto promote;
2176
2177         osd = lookup_create_osd(osdc, req->r_t.osd, wrlocked);
2178         if (IS_ERR(osd)) {
2179                 WARN_ON(PTR_ERR(osd) != -EAGAIN || wrlocked);
2180                 goto promote;
2181         }
2182
2183         if (osdc->osdmap->epoch < osdc->epoch_barrier) {
2184                 dout("req %p epoch %u barrier %u\n", req, osdc->osdmap->epoch,
2185                      osdc->epoch_barrier);
2186                 req->r_t.paused = true;
2187                 maybe_request_map(osdc);
2188         } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2189                    ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR)) {
2190                 dout("req %p pausewr\n", req);
2191                 req->r_t.paused = true;
2192                 maybe_request_map(osdc);
2193         } else if ((req->r_flags & CEPH_OSD_FLAG_READ) &&
2194                    ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) {
2195                 dout("req %p pauserd\n", req);
2196                 req->r_t.paused = true;
2197                 maybe_request_map(osdc);
2198         } else if ((req->r_flags & CEPH_OSD_FLAG_WRITE) &&
2199                    !(req->r_flags & (CEPH_OSD_FLAG_FULL_TRY |
2200                                      CEPH_OSD_FLAG_FULL_FORCE)) &&
2201                    (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2202                     pool_full(osdc, req->r_t.base_oloc.pool))) {
2203                 dout("req %p full/pool_full\n", req);
2204                 pr_warn_ratelimited("FULL or reached pool quota\n");
2205                 req->r_t.paused = true;
2206                 maybe_request_map(osdc);
2207                 if (req->r_abort_on_full)
2208                         need_abort = true;
2209         } else if (!osd_homeless(osd)) {
2210                 need_send = true;
2211         } else {
2212                 maybe_request_map(osdc);
2213         }
2214
2215         mutex_lock(&osd->lock);
2216         /*
2217          * Assign the tid atomically with send_request() to protect
2218          * multiple writes to the same object from racing with each
2219          * other, resulting in out of order ops on the OSDs.
2220          */
2221         req->r_tid = atomic64_inc_return(&osdc->last_tid);
2222         link_request(osd, req);
2223         if (need_send)
2224                 send_request(req);
2225         else if (need_abort)
2226                 complete_request(req, -ENOSPC);
2227         mutex_unlock(&osd->lock);
2228
2229         if (ct_res == CALC_TARGET_POOL_DNE)
2230                 send_map_check(req);
2231
2232         if (promoted)
2233                 downgrade_write(&osdc->lock);
2234         return;
2235
2236 promote:
2237         up_read(&osdc->lock);
2238         down_write(&osdc->lock);
2239         wrlocked = true;
2240         promoted = true;
2241         goto again;
2242 }
2243
2244 static void account_request(struct ceph_osd_request *req)
2245 {
2246         WARN_ON(req->r_flags & (CEPH_OSD_FLAG_ACK | CEPH_OSD_FLAG_ONDISK));
2247         WARN_ON(!(req->r_flags & (CEPH_OSD_FLAG_READ | CEPH_OSD_FLAG_WRITE)));
2248
2249         req->r_flags |= CEPH_OSD_FLAG_ONDISK;
2250         atomic_inc(&req->r_osdc->num_requests);
2251
2252         req->r_start_stamp = jiffies;
2253 }
2254
2255 static void submit_request(struct ceph_osd_request *req, bool wrlocked)
2256 {
2257         ceph_osdc_get_request(req);
2258         account_request(req);
2259         __submit_request(req, wrlocked);
2260 }
2261
2262 static void finish_request(struct ceph_osd_request *req)
2263 {
2264         struct ceph_osd_client *osdc = req->r_osdc;
2265
2266         WARN_ON(lookup_request_mc(&osdc->map_checks, req->r_tid));
2267         dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
2268
2269         if (req->r_osd)
2270                 unlink_request(req->r_osd, req);
2271         atomic_dec(&osdc->num_requests);
2272
2273         /*
2274          * If an OSD has failed or returned and a request has been sent
2275          * twice, it's possible to get a reply and end up here while the
2276          * request message is queued for delivery.  We will ignore the
2277          * reply, so not a big deal, but better to try and catch it.
2278          */
2279         ceph_msg_revoke(req->r_request);
2280         ceph_msg_revoke_incoming(req->r_reply);
2281 }
2282
2283 static void __complete_request(struct ceph_osd_request *req)
2284 {
2285         if (req->r_callback) {
2286                 dout("%s req %p tid %llu cb %pf result %d\n", __func__, req,
2287                      req->r_tid, req->r_callback, req->r_result);
2288                 req->r_callback(req);
2289         }
2290 }
2291
2292 /*
2293  * This is open-coded in handle_reply().
2294  */
2295 static void complete_request(struct ceph_osd_request *req, int err)
2296 {
2297         dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
2298
2299         req->r_result = err;
2300         finish_request(req);
2301         __complete_request(req);
2302         complete_all(&req->r_completion);
2303         ceph_osdc_put_request(req);
2304 }
2305
2306 static void cancel_map_check(struct ceph_osd_request *req)
2307 {
2308         struct ceph_osd_client *osdc = req->r_osdc;
2309         struct ceph_osd_request *lookup_req;
2310
2311         verify_osdc_wrlocked(osdc);
2312
2313         lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
2314         if (!lookup_req)
2315                 return;
2316
2317         WARN_ON(lookup_req != req);
2318         erase_request_mc(&osdc->map_checks, req);
2319         ceph_osdc_put_request(req);
2320 }
2321
2322 static void cancel_request(struct ceph_osd_request *req)
2323 {
2324         dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
2325
2326         cancel_map_check(req);
2327         finish_request(req);
2328         complete_all(&req->r_completion);
2329         ceph_osdc_put_request(req);
2330 }
2331
2332 static void abort_request(struct ceph_osd_request *req, int err)
2333 {
2334         dout("%s req %p tid %llu err %d\n", __func__, req, req->r_tid, err);
2335
2336         cancel_map_check(req);
2337         complete_request(req, err);
2338 }
2339
2340 static void update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
2341 {
2342         if (likely(eb > osdc->epoch_barrier)) {
2343                 dout("updating epoch_barrier from %u to %u\n",
2344                                 osdc->epoch_barrier, eb);
2345                 osdc->epoch_barrier = eb;
2346                 /* Request map if we're not to the barrier yet */
2347                 if (eb > osdc->osdmap->epoch)
2348                         maybe_request_map(osdc);
2349         }
2350 }
2351
2352 void ceph_osdc_update_epoch_barrier(struct ceph_osd_client *osdc, u32 eb)
2353 {
2354         down_read(&osdc->lock);
2355         if (unlikely(eb > osdc->epoch_barrier)) {
2356                 up_read(&osdc->lock);
2357                 down_write(&osdc->lock);
2358                 update_epoch_barrier(osdc, eb);
2359                 up_write(&osdc->lock);
2360         } else {
2361                 up_read(&osdc->lock);
2362         }
2363 }
2364 EXPORT_SYMBOL(ceph_osdc_update_epoch_barrier);
2365
2366 /*
2367  * Drop all pending requests that are stalled waiting on a full condition to
2368  * clear, and complete them with ENOSPC as the return code. Set the
2369  * osdc->epoch_barrier to the latest map epoch that we've seen if any were
2370  * cancelled.
2371  */
2372 static void ceph_osdc_abort_on_full(struct ceph_osd_client *osdc)
2373 {
2374         struct rb_node *n;
2375         bool victims = false;
2376
2377         dout("enter abort_on_full\n");
2378
2379         if (!ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) && !have_pool_full(osdc))
2380                 goto out;
2381
2382         /* Scan list and see if there is anything to abort */
2383         for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
2384                 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
2385                 struct rb_node *m;
2386
2387                 m = rb_first(&osd->o_requests);
2388                 while (m) {
2389                         struct ceph_osd_request *req = rb_entry(m,
2390                                         struct ceph_osd_request, r_node);
2391                         m = rb_next(m);
2392
2393                         if (req->r_abort_on_full) {
2394                                 victims = true;
2395                                 break;
2396                         }
2397                 }
2398                 if (victims)
2399                         break;
2400         }
2401
2402         if (!victims)
2403                 goto out;
2404
2405         /*
2406          * Update the barrier to current epoch if it's behind that point,
2407          * since we know we have some calls to be aborted in the tree.
2408          */
2409         update_epoch_barrier(osdc, osdc->osdmap->epoch);
2410
2411         for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
2412                 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
2413                 struct rb_node *m;
2414
2415                 m = rb_first(&osd->o_requests);
2416                 while (m) {
2417                         struct ceph_osd_request *req = rb_entry(m,
2418                                         struct ceph_osd_request, r_node);
2419                         m = rb_next(m);
2420
2421                         if (req->r_abort_on_full &&
2422                             (ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
2423                              pool_full(osdc, req->r_t.target_oloc.pool)))
2424                                 abort_request(req, -ENOSPC);
2425                 }
2426         }
2427 out:
2428         dout("return abort_on_full barrier=%u\n", osdc->epoch_barrier);
2429 }
2430
2431 static void check_pool_dne(struct ceph_osd_request *req)
2432 {
2433         struct ceph_osd_client *osdc = req->r_osdc;
2434         struct ceph_osdmap *map = osdc->osdmap;
2435
2436         verify_osdc_wrlocked(osdc);
2437         WARN_ON(!map->epoch);
2438
2439         if (req->r_attempts) {
2440                 /*
2441                  * We sent a request earlier, which means that
2442                  * previously the pool existed, and now it does not
2443                  * (i.e., it was deleted).
2444                  */
2445                 req->r_map_dne_bound = map->epoch;
2446                 dout("%s req %p tid %llu pool disappeared\n", __func__, req,
2447                      req->r_tid);
2448         } else {
2449                 dout("%s req %p tid %llu map_dne_bound %u have %u\n", __func__,
2450                      req, req->r_tid, req->r_map_dne_bound, map->epoch);
2451         }
2452
2453         if (req->r_map_dne_bound) {
2454                 if (map->epoch >= req->r_map_dne_bound) {
2455                         /* we had a new enough map */
2456                         pr_info_ratelimited("tid %llu pool does not exist\n",
2457                                             req->r_tid);
2458                         complete_request(req, -ENOENT);
2459                 }
2460         } else {
2461                 send_map_check(req);
2462         }
2463 }
2464
2465 static void map_check_cb(struct ceph_mon_generic_request *greq)
2466 {
2467         struct ceph_osd_client *osdc = &greq->monc->client->osdc;
2468         struct ceph_osd_request *req;
2469         u64 tid = greq->private_data;
2470
2471         WARN_ON(greq->result || !greq->u.newest);
2472
2473         down_write(&osdc->lock);
2474         req = lookup_request_mc(&osdc->map_checks, tid);
2475         if (!req) {
2476                 dout("%s tid %llu dne\n", __func__, tid);
2477                 goto out_unlock;
2478         }
2479
2480         dout("%s req %p tid %llu map_dne_bound %u newest %llu\n", __func__,
2481              req, req->r_tid, req->r_map_dne_bound, greq->u.newest);
2482         if (!req->r_map_dne_bound)
2483                 req->r_map_dne_bound = greq->u.newest;
2484         erase_request_mc(&osdc->map_checks, req);
2485         check_pool_dne(req);
2486
2487         ceph_osdc_put_request(req);
2488 out_unlock:
2489         up_write(&osdc->lock);
2490 }
2491
2492 static void send_map_check(struct ceph_osd_request *req)
2493 {
2494         struct ceph_osd_client *osdc = req->r_osdc;
2495         struct ceph_osd_request *lookup_req;
2496         int ret;
2497
2498         verify_osdc_wrlocked(osdc);
2499
2500         lookup_req = lookup_request_mc(&osdc->map_checks, req->r_tid);
2501         if (lookup_req) {
2502                 WARN_ON(lookup_req != req);
2503                 return;
2504         }
2505
2506         ceph_osdc_get_request(req);
2507         insert_request_mc(&osdc->map_checks, req);
2508         ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
2509                                           map_check_cb, req->r_tid);
2510         WARN_ON(ret);
2511 }
2512
2513 /*
2514  * lingering requests, watch/notify v2 infrastructure
2515  */
2516 static void linger_release(struct kref *kref)
2517 {
2518         struct ceph_osd_linger_request *lreq =
2519             container_of(kref, struct ceph_osd_linger_request, kref);
2520
2521         dout("%s lreq %p reg_req %p ping_req %p\n", __func__, lreq,
2522              lreq->reg_req, lreq->ping_req);
2523         WARN_ON(!RB_EMPTY_NODE(&lreq->node));
2524         WARN_ON(!RB_EMPTY_NODE(&lreq->osdc_node));
2525         WARN_ON(!RB_EMPTY_NODE(&lreq->mc_node));
2526         WARN_ON(!list_empty(&lreq->scan_item));
2527         WARN_ON(!list_empty(&lreq->pending_lworks));
2528         WARN_ON(lreq->osd);
2529
2530         if (lreq->reg_req)
2531                 ceph_osdc_put_request(lreq->reg_req);
2532         if (lreq->ping_req)
2533                 ceph_osdc_put_request(lreq->ping_req);
2534         target_destroy(&lreq->t);
2535         kfree(lreq);
2536 }
2537
2538 static void linger_put(struct ceph_osd_linger_request *lreq)
2539 {
2540         if (lreq)
2541                 kref_put(&lreq->kref, linger_release);
2542 }
2543
2544 static struct ceph_osd_linger_request *
2545 linger_get(struct ceph_osd_linger_request *lreq)
2546 {
2547         kref_get(&lreq->kref);
2548         return lreq;
2549 }
2550
2551 static struct ceph_osd_linger_request *
2552 linger_alloc(struct ceph_osd_client *osdc)
2553 {
2554         struct ceph_osd_linger_request *lreq;
2555
2556         lreq = kzalloc(sizeof(*lreq), GFP_NOIO);
2557         if (!lreq)
2558                 return NULL;
2559
2560         kref_init(&lreq->kref);
2561         mutex_init(&lreq->lock);
2562         RB_CLEAR_NODE(&lreq->node);
2563         RB_CLEAR_NODE(&lreq->osdc_node);
2564         RB_CLEAR_NODE(&lreq->mc_node);
2565         INIT_LIST_HEAD(&lreq->scan_item);
2566         INIT_LIST_HEAD(&lreq->pending_lworks);
2567         init_completion(&lreq->reg_commit_wait);
2568         init_completion(&lreq->notify_finish_wait);
2569
2570         lreq->osdc = osdc;
2571         target_init(&lreq->t);
2572
2573         dout("%s lreq %p\n", __func__, lreq);
2574         return lreq;
2575 }
2576
2577 DEFINE_RB_INSDEL_FUNCS(linger, struct ceph_osd_linger_request, linger_id, node)
2578 DEFINE_RB_FUNCS(linger_osdc, struct ceph_osd_linger_request, linger_id, osdc_node)
2579 DEFINE_RB_FUNCS(linger_mc, struct ceph_osd_linger_request, linger_id, mc_node)
2580
2581 /*
2582  * Create linger request <-> OSD session relation.
2583  *
2584  * @lreq has to be registered, @osd may be homeless.
2585  */
2586 static void link_linger(struct ceph_osd *osd,
2587                         struct ceph_osd_linger_request *lreq)
2588 {
2589         verify_osd_locked(osd);
2590         WARN_ON(!lreq->linger_id || lreq->osd);
2591         dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
2592              osd->o_osd, lreq, lreq->linger_id);
2593
2594         if (!osd_homeless(osd))
2595                 __remove_osd_from_lru(osd);
2596         else
2597                 atomic_inc(&osd->o_osdc->num_homeless);
2598
2599         get_osd(osd);
2600         insert_linger(&osd->o_linger_requests, lreq);
2601         lreq->osd = osd;
2602 }
2603
2604 static void unlink_linger(struct ceph_osd *osd,
2605                           struct ceph_osd_linger_request *lreq)
2606 {
2607         verify_osd_locked(osd);
2608         WARN_ON(lreq->osd != osd);
2609         dout("%s osd %p osd%d lreq %p linger_id %llu\n", __func__, osd,
2610              osd->o_osd, lreq, lreq->linger_id);
2611
2612         lreq->osd = NULL;
2613         erase_linger(&osd->o_linger_requests, lreq);
2614         put_osd(osd);
2615
2616         if (!osd_homeless(osd))
2617                 maybe_move_osd_to_lru(osd);
2618         else
2619                 atomic_dec(&osd->o_osdc->num_homeless);
2620 }
2621
2622 static bool __linger_registered(struct ceph_osd_linger_request *lreq)
2623 {
2624         verify_osdc_locked(lreq->osdc);
2625
2626         return !RB_EMPTY_NODE(&lreq->osdc_node);
2627 }
2628
2629 static bool linger_registered(struct ceph_osd_linger_request *lreq)
2630 {
2631         struct ceph_osd_client *osdc = lreq->osdc;
2632         bool registered;
2633
2634         down_read(&osdc->lock);
2635         registered = __linger_registered(lreq);
2636         up_read(&osdc->lock);
2637
2638         return registered;
2639 }
2640
2641 static void linger_register(struct ceph_osd_linger_request *lreq)
2642 {
2643         struct ceph_osd_client *osdc = lreq->osdc;
2644
2645         verify_osdc_wrlocked(osdc);
2646         WARN_ON(lreq->linger_id);
2647
2648         linger_get(lreq);
2649         lreq->linger_id = ++osdc->last_linger_id;
2650         insert_linger_osdc(&osdc->linger_requests, lreq);
2651 }
2652
2653 static void linger_unregister(struct ceph_osd_linger_request *lreq)
2654 {
2655         struct ceph_osd_client *osdc = lreq->osdc;
2656
2657         verify_osdc_wrlocked(osdc);
2658
2659         erase_linger_osdc(&osdc->linger_requests, lreq);
2660         linger_put(lreq);
2661 }
2662
2663 static void cancel_linger_request(struct ceph_osd_request *req)
2664 {
2665         struct ceph_osd_linger_request *lreq = req->r_priv;
2666
2667         WARN_ON(!req->r_linger);
2668         cancel_request(req);
2669         linger_put(lreq);
2670 }
2671
2672 struct linger_work {
2673         struct work_struct work;
2674         struct ceph_osd_linger_request *lreq;
2675         struct list_head pending_item;
2676         unsigned long queued_stamp;
2677
2678         union {
2679                 struct {
2680                         u64 notify_id;
2681                         u64 notifier_id;
2682                         void *payload; /* points into @msg front */
2683                         size_t payload_len;
2684
2685                         struct ceph_msg *msg; /* for ceph_msg_put() */
2686                 } notify;
2687                 struct {
2688                         int err;
2689                 } error;
2690         };
2691 };
2692
2693 static struct linger_work *lwork_alloc(struct ceph_osd_linger_request *lreq,
2694                                        work_func_t workfn)
2695 {
2696         struct linger_work *lwork;
2697
2698         lwork = kzalloc(sizeof(*lwork), GFP_NOIO);
2699         if (!lwork)
2700                 return NULL;
2701
2702         INIT_WORK(&lwork->work, workfn);
2703         INIT_LIST_HEAD(&lwork->pending_item);
2704         lwork->lreq = linger_get(lreq);
2705
2706         return lwork;
2707 }
2708
2709 static void lwork_free(struct linger_work *lwork)
2710 {
2711         struct ceph_osd_linger_request *lreq = lwork->lreq;
2712
2713         mutex_lock(&lreq->lock);
2714         list_del(&lwork->pending_item);
2715         mutex_unlock(&lreq->lock);
2716
2717         linger_put(lreq);
2718         kfree(lwork);
2719 }
2720
2721 static void lwork_queue(struct linger_work *lwork)
2722 {
2723         struct ceph_osd_linger_request *lreq = lwork->lreq;
2724         struct ceph_osd_client *osdc = lreq->osdc;
2725
2726         verify_lreq_locked(lreq);
2727         WARN_ON(!list_empty(&lwork->pending_item));
2728
2729         lwork->queued_stamp = jiffies;
2730         list_add_tail(&lwork->pending_item, &lreq->pending_lworks);
2731         queue_work(osdc->notify_wq, &lwork->work);
2732 }
2733
2734 static void do_watch_notify(struct work_struct *w)
2735 {
2736         struct linger_work *lwork = container_of(w, struct linger_work, work);
2737         struct ceph_osd_linger_request *lreq = lwork->lreq;
2738
2739         if (!linger_registered(lreq)) {
2740                 dout("%s lreq %p not registered\n", __func__, lreq);
2741                 goto out;
2742         }
2743
2744         WARN_ON(!lreq->is_watch);
2745         dout("%s lreq %p notify_id %llu notifier_id %llu payload_len %zu\n",
2746              __func__, lreq, lwork->notify.notify_id, lwork->notify.notifier_id,
2747              lwork->notify.payload_len);
2748         lreq->wcb(lreq->data, lwork->notify.notify_id, lreq->linger_id,
2749                   lwork->notify.notifier_id, lwork->notify.payload,
2750                   lwork->notify.payload_len);
2751
2752 out:
2753         ceph_msg_put(lwork->notify.msg);
2754         lwork_free(lwork);
2755 }
2756
2757 static void do_watch_error(struct work_struct *w)
2758 {
2759         struct linger_work *lwork = container_of(w, struct linger_work, work);
2760         struct ceph_osd_linger_request *lreq = lwork->lreq;
2761
2762         if (!linger_registered(lreq)) {
2763                 dout("%s lreq %p not registered\n", __func__, lreq);
2764                 goto out;
2765         }
2766
2767         dout("%s lreq %p err %d\n", __func__, lreq, lwork->error.err);
2768         lreq->errcb(lreq->data, lreq->linger_id, lwork->error.err);
2769
2770 out:
2771         lwork_free(lwork);
2772 }
2773
2774 static void queue_watch_error(struct ceph_osd_linger_request *lreq)
2775 {
2776         struct linger_work *lwork;
2777
2778         lwork = lwork_alloc(lreq, do_watch_error);
2779         if (!lwork) {
2780                 pr_err("failed to allocate error-lwork\n");
2781                 return;
2782         }
2783
2784         lwork->error.err = lreq->last_error;
2785         lwork_queue(lwork);
2786 }
2787
2788 static void linger_reg_commit_complete(struct ceph_osd_linger_request *lreq,
2789                                        int result)
2790 {
2791         if (!completion_done(&lreq->reg_commit_wait)) {
2792                 lreq->reg_commit_error = (result <= 0 ? result : 0);
2793                 complete_all(&lreq->reg_commit_wait);
2794         }
2795 }
2796
2797 static void linger_commit_cb(struct ceph_osd_request *req)
2798 {
2799         struct ceph_osd_linger_request *lreq = req->r_priv;
2800
2801         mutex_lock(&lreq->lock);
2802         dout("%s lreq %p linger_id %llu result %d\n", __func__, lreq,
2803              lreq->linger_id, req->r_result);
2804         linger_reg_commit_complete(lreq, req->r_result);
2805         lreq->committed = true;
2806
2807         if (!lreq->is_watch) {
2808                 struct ceph_osd_data *osd_data =
2809                     osd_req_op_data(req, 0, notify, response_data);
2810                 void *p = page_address(osd_data->pages[0]);
2811
2812                 WARN_ON(req->r_ops[0].op != CEPH_OSD_OP_NOTIFY ||
2813                         osd_data->type != CEPH_OSD_DATA_TYPE_PAGES);
2814
2815                 /* make note of the notify_id */
2816                 if (req->r_ops[0].outdata_len >= sizeof(u64)) {
2817                         lreq->notify_id = ceph_decode_64(&p);
2818                         dout("lreq %p notify_id %llu\n", lreq,
2819                              lreq->notify_id);
2820                 } else {
2821                         dout("lreq %p no notify_id\n", lreq);
2822                 }
2823         }
2824
2825         mutex_unlock(&lreq->lock);
2826         linger_put(lreq);
2827 }
2828
2829 static int normalize_watch_error(int err)
2830 {
2831         /*
2832          * Translate ENOENT -> ENOTCONN so that a delete->disconnection
2833          * notification and a failure to reconnect because we raced with
2834          * the delete appear the same to the user.
2835          */
2836         if (err == -ENOENT)
2837                 err = -ENOTCONN;
2838
2839         return err;
2840 }
2841
2842 static void linger_reconnect_cb(struct ceph_osd_request *req)
2843 {
2844         struct ceph_osd_linger_request *lreq = req->r_priv;
2845
2846         mutex_lock(&lreq->lock);
2847         dout("%s lreq %p linger_id %llu result %d last_error %d\n", __func__,
2848              lreq, lreq->linger_id, req->r_result, lreq->last_error);
2849         if (req->r_result < 0) {
2850                 if (!lreq->last_error) {
2851                         lreq->last_error = normalize_watch_error(req->r_result);
2852                         queue_watch_error(lreq);
2853                 }
2854         }
2855
2856         mutex_unlock(&lreq->lock);
2857         linger_put(lreq);
2858 }
2859
2860 static void send_linger(struct ceph_osd_linger_request *lreq)
2861 {
2862         struct ceph_osd_request *req = lreq->reg_req;
2863         struct ceph_osd_req_op *op = &req->r_ops[0];
2864
2865         verify_osdc_wrlocked(req->r_osdc);
2866         dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
2867
2868         if (req->r_osd)
2869                 cancel_linger_request(req);
2870
2871         request_reinit(req);
2872         ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
2873         ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
2874         req->r_flags = lreq->t.flags;
2875         req->r_mtime = lreq->mtime;
2876
2877         mutex_lock(&lreq->lock);
2878         if (lreq->is_watch && lreq->committed) {
2879                 WARN_ON(op->op != CEPH_OSD_OP_WATCH ||
2880                         op->watch.cookie != lreq->linger_id);
2881                 op->watch.op = CEPH_OSD_WATCH_OP_RECONNECT;
2882                 op->watch.gen = ++lreq->register_gen;
2883                 dout("lreq %p reconnect register_gen %u\n", lreq,
2884                      op->watch.gen);
2885                 req->r_callback = linger_reconnect_cb;
2886         } else {
2887                 if (!lreq->is_watch)
2888                         lreq->notify_id = 0;
2889                 else
2890                         WARN_ON(op->watch.op != CEPH_OSD_WATCH_OP_WATCH);
2891                 dout("lreq %p register\n", lreq);
2892                 req->r_callback = linger_commit_cb;
2893         }
2894         mutex_unlock(&lreq->lock);
2895
2896         req->r_priv = linger_get(lreq);
2897         req->r_linger = true;
2898
2899         submit_request(req, true);
2900 }
2901
2902 static void linger_ping_cb(struct ceph_osd_request *req)
2903 {
2904         struct ceph_osd_linger_request *lreq = req->r_priv;
2905
2906         mutex_lock(&lreq->lock);
2907         dout("%s lreq %p linger_id %llu result %d ping_sent %lu last_error %d\n",
2908              __func__, lreq, lreq->linger_id, req->r_result, lreq->ping_sent,
2909              lreq->last_error);
2910         if (lreq->register_gen == req->r_ops[0].watch.gen) {
2911                 if (!req->r_result) {
2912                         lreq->watch_valid_thru = lreq->ping_sent;
2913                 } else if (!lreq->last_error) {
2914                         lreq->last_error = normalize_watch_error(req->r_result);
2915                         queue_watch_error(lreq);
2916                 }
2917         } else {
2918                 dout("lreq %p register_gen %u ignoring old pong %u\n", lreq,
2919                      lreq->register_gen, req->r_ops[0].watch.gen);
2920         }
2921
2922         mutex_unlock(&lreq->lock);
2923         linger_put(lreq);
2924 }
2925
2926 static void send_linger_ping(struct ceph_osd_linger_request *lreq)
2927 {
2928         struct ceph_osd_client *osdc = lreq->osdc;
2929         struct ceph_osd_request *req = lreq->ping_req;
2930         struct ceph_osd_req_op *op = &req->r_ops[0];
2931
2932         if (ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD)) {
2933                 dout("%s PAUSERD\n", __func__);
2934                 return;
2935         }
2936
2937         lreq->ping_sent = jiffies;
2938         dout("%s lreq %p linger_id %llu ping_sent %lu register_gen %u\n",
2939              __func__, lreq, lreq->linger_id, lreq->ping_sent,
2940              lreq->register_gen);
2941
2942         if (req->r_osd)
2943                 cancel_linger_request(req);
2944
2945         request_reinit(req);
2946         target_copy(&req->r_t, &lreq->t);
2947
2948         WARN_ON(op->op != CEPH_OSD_OP_WATCH ||
2949                 op->watch.cookie != lreq->linger_id ||
2950                 op->watch.op != CEPH_OSD_WATCH_OP_PING);
2951         op->watch.gen = lreq->register_gen;
2952         req->r_callback = linger_ping_cb;
2953         req->r_priv = linger_get(lreq);
2954         req->r_linger = true;
2955
2956         ceph_osdc_get_request(req);
2957         account_request(req);
2958         req->r_tid = atomic64_inc_return(&osdc->last_tid);
2959         link_request(lreq->osd, req);
2960         send_request(req);
2961 }
2962
2963 static void linger_submit(struct ceph_osd_linger_request *lreq)
2964 {
2965         struct ceph_osd_client *osdc = lreq->osdc;
2966         struct ceph_osd *osd;
2967
2968         calc_target(osdc, &lreq->t, NULL, false);
2969         osd = lookup_create_osd(osdc, lreq->t.osd, true);
2970         link_linger(osd, lreq);
2971
2972         send_linger(lreq);
2973 }
2974
2975 static void cancel_linger_map_check(struct ceph_osd_linger_request *lreq)
2976 {
2977         struct ceph_osd_client *osdc = lreq->osdc;
2978         struct ceph_osd_linger_request *lookup_lreq;
2979
2980         verify_osdc_wrlocked(osdc);
2981
2982         lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
2983                                        lreq->linger_id);
2984         if (!lookup_lreq)
2985                 return;
2986
2987         WARN_ON(lookup_lreq != lreq);
2988         erase_linger_mc(&osdc->linger_map_checks, lreq);
2989         linger_put(lreq);
2990 }
2991
2992 /*
2993  * @lreq has to be both registered and linked.
2994  */
2995 static void __linger_cancel(struct ceph_osd_linger_request *lreq)
2996 {
2997         if (lreq->is_watch && lreq->ping_req->r_osd)
2998                 cancel_linger_request(lreq->ping_req);
2999         if (lreq->reg_req->r_osd)
3000                 cancel_linger_request(lreq->reg_req);
3001         cancel_linger_map_check(lreq);
3002         unlink_linger(lreq->osd, lreq);
3003         linger_unregister(lreq);
3004 }
3005
3006 static void linger_cancel(struct ceph_osd_linger_request *lreq)
3007 {
3008         struct ceph_osd_client *osdc = lreq->osdc;
3009
3010         down_write(&osdc->lock);
3011         if (__linger_registered(lreq))
3012                 __linger_cancel(lreq);
3013         up_write(&osdc->lock);
3014 }
3015
3016 static void send_linger_map_check(struct ceph_osd_linger_request *lreq);
3017
3018 static void check_linger_pool_dne(struct ceph_osd_linger_request *lreq)
3019 {
3020         struct ceph_osd_client *osdc = lreq->osdc;
3021         struct ceph_osdmap *map = osdc->osdmap;
3022
3023         verify_osdc_wrlocked(osdc);
3024         WARN_ON(!map->epoch);
3025
3026         if (lreq->register_gen) {
3027                 lreq->map_dne_bound = map->epoch;
3028                 dout("%s lreq %p linger_id %llu pool disappeared\n", __func__,
3029                      lreq, lreq->linger_id);
3030         } else {
3031                 dout("%s lreq %p linger_id %llu map_dne_bound %u have %u\n",
3032                      __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
3033                      map->epoch);
3034         }
3035
3036         if (lreq->map_dne_bound) {
3037                 if (map->epoch >= lreq->map_dne_bound) {
3038                         /* we had a new enough map */
3039                         pr_info("linger_id %llu pool does not exist\n",
3040                                 lreq->linger_id);
3041                         linger_reg_commit_complete(lreq, -ENOENT);
3042                         __linger_cancel(lreq);
3043                 }
3044         } else {
3045                 send_linger_map_check(lreq);
3046         }
3047 }
3048
3049 static void linger_map_check_cb(struct ceph_mon_generic_request *greq)
3050 {
3051         struct ceph_osd_client *osdc = &greq->monc->client->osdc;
3052         struct ceph_osd_linger_request *lreq;
3053         u64 linger_id = greq->private_data;
3054
3055         WARN_ON(greq->result || !greq->u.newest);
3056
3057         down_write(&osdc->lock);
3058         lreq = lookup_linger_mc(&osdc->linger_map_checks, linger_id);
3059         if (!lreq) {
3060                 dout("%s linger_id %llu dne\n", __func__, linger_id);
3061                 goto out_unlock;
3062         }
3063
3064         dout("%s lreq %p linger_id %llu map_dne_bound %u newest %llu\n",
3065              __func__, lreq, lreq->linger_id, lreq->map_dne_bound,
3066              greq->u.newest);
3067         if (!lreq->map_dne_bound)
3068                 lreq->map_dne_bound = greq->u.newest;
3069         erase_linger_mc(&osdc->linger_map_checks, lreq);
3070         check_linger_pool_dne(lreq);
3071
3072         linger_put(lreq);
3073 out_unlock:
3074         up_write(&osdc->lock);
3075 }
3076
3077 static void send_linger_map_check(struct ceph_osd_linger_request *lreq)
3078 {
3079         struct ceph_osd_client *osdc = lreq->osdc;
3080         struct ceph_osd_linger_request *lookup_lreq;
3081         int ret;
3082
3083         verify_osdc_wrlocked(osdc);
3084
3085         lookup_lreq = lookup_linger_mc(&osdc->linger_map_checks,
3086                                        lreq->linger_id);
3087         if (lookup_lreq) {
3088                 WARN_ON(lookup_lreq != lreq);
3089                 return;
3090         }
3091
3092         linger_get(lreq);
3093         insert_linger_mc(&osdc->linger_map_checks, lreq);
3094         ret = ceph_monc_get_version_async(&osdc->client->monc, "osdmap",
3095                                           linger_map_check_cb, lreq->linger_id);
3096         WARN_ON(ret);
3097 }
3098
3099 static int linger_reg_commit_wait(struct ceph_osd_linger_request *lreq)
3100 {
3101         int ret;
3102
3103         dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3104         ret = wait_for_completion_interruptible(&lreq->reg_commit_wait);
3105         return ret ?: lreq->reg_commit_error;
3106 }
3107
3108 static int linger_notify_finish_wait(struct ceph_osd_linger_request *lreq)
3109 {
3110         int ret;
3111
3112         dout("%s lreq %p linger_id %llu\n", __func__, lreq, lreq->linger_id);
3113         ret = wait_for_completion_interruptible(&lreq->notify_finish_wait);
3114         return ret ?: lreq->notify_finish_error;
3115 }
3116
3117 /*
3118  * Timeout callback, called every N seconds.  When 1 or more OSD
3119  * requests has been active for more than N seconds, we send a keepalive
3120  * (tag + timestamp) to its OSD to ensure any communications channel
3121  * reset is detected.
3122  */
3123 static void handle_timeout(struct work_struct *work)
3124 {
3125         struct ceph_osd_client *osdc =
3126                 container_of(work, struct ceph_osd_client, timeout_work.work);
3127         struct ceph_options *opts = osdc->client->options;
3128         unsigned long cutoff = jiffies - opts->osd_keepalive_timeout;
3129         unsigned long expiry_cutoff = jiffies - opts->osd_request_timeout;
3130         LIST_HEAD(slow_osds);
3131         struct rb_node *n, *p;
3132
3133         dout("%s osdc %p\n", __func__, osdc);
3134         down_write(&osdc->lock);
3135
3136         /*
3137          * ping osds that are a bit slow.  this ensures that if there
3138          * is a break in the TCP connection we will notice, and reopen
3139          * a connection with that osd (from the fault callback).
3140          */
3141         for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
3142                 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
3143                 bool found = false;
3144
3145                 for (p = rb_first(&osd->o_requests); p; ) {
3146                         struct ceph_osd_request *req =
3147                             rb_entry(p, struct ceph_osd_request, r_node);
3148
3149                         p = rb_next(p); /* abort_request() */
3150
3151                         if (time_before(req->r_stamp, cutoff)) {
3152                                 dout(" req %p tid %llu on osd%d is laggy\n",
3153                                      req, req->r_tid, osd->o_osd);
3154                                 found = true;
3155                         }
3156                         if (opts->osd_request_timeout &&
3157                             time_before(req->r_start_stamp, expiry_cutoff)) {
3158                                 pr_err_ratelimited("tid %llu on osd%d timeout\n",
3159                                        req->r_tid, osd->o_osd);
3160                                 abort_request(req, -ETIMEDOUT);
3161                         }
3162                 }
3163                 for (p = rb_first(&osd->o_linger_requests); p; p = rb_next(p)) {
3164                         struct ceph_osd_linger_request *lreq =
3165                             rb_entry(p, struct ceph_osd_linger_request, node);
3166
3167                         dout(" lreq %p linger_id %llu is served by osd%d\n",
3168                              lreq, lreq->linger_id, osd->o_osd);
3169                         found = true;
3170
3171                         mutex_lock(&lreq->lock);
3172                         if (lreq->is_watch && lreq->committed && !lreq->last_error)
3173                                 send_linger_ping(lreq);
3174                         mutex_unlock(&lreq->lock);
3175                 }
3176
3177                 if (found)
3178                         list_move_tail(&osd->o_keepalive_item, &slow_osds);
3179         }
3180
3181         if (opts->osd_request_timeout) {
3182                 for (p = rb_first(&osdc->homeless_osd.o_requests); p; ) {
3183                         struct ceph_osd_request *req =
3184                             rb_entry(p, struct ceph_osd_request, r_node);
3185
3186                         p = rb_next(p); /* abort_request() */
3187
3188                         if (time_before(req->r_start_stamp, expiry_cutoff)) {
3189                                 pr_err_ratelimited("tid %llu on osd%d timeout\n",
3190                                        req->r_tid, osdc->homeless_osd.o_osd);
3191                                 abort_request(req, -ETIMEDOUT);
3192                         }
3193                 }
3194         }
3195
3196         if (atomic_read(&osdc->num_homeless) || !list_empty(&slow_osds))
3197                 maybe_request_map(osdc);
3198
3199         while (!list_empty(&slow_osds)) {
3200                 struct ceph_osd *osd = list_first_entry(&slow_osds,
3201                                                         struct ceph_osd,
3202                                                         o_keepalive_item);
3203                 list_del_init(&osd->o_keepalive_item);
3204                 ceph_con_keepalive(&osd->o_con);
3205         }
3206
3207         up_write(&osdc->lock);
3208         schedule_delayed_work(&osdc->timeout_work,
3209                               osdc->client->options->osd_keepalive_timeout);
3210 }
3211
3212 static void handle_osds_timeout(struct work_struct *work)
3213 {
3214         struct ceph_osd_client *osdc =
3215                 container_of(work, struct ceph_osd_client,
3216                              osds_timeout_work.work);
3217         unsigned long delay = osdc->client->options->osd_idle_ttl / 4;
3218         struct ceph_osd *osd, *nosd;
3219
3220         dout("%s osdc %p\n", __func__, osdc);
3221         down_write(&osdc->lock);
3222         list_for_each_entry_safe(osd, nosd, &osdc->osd_lru, o_osd_lru) {
3223                 if (time_before(jiffies, osd->lru_ttl))
3224                         break;
3225
3226                 WARN_ON(!RB_EMPTY_ROOT(&osd->o_requests));
3227                 WARN_ON(!RB_EMPTY_ROOT(&osd->o_linger_requests));
3228                 close_osd(osd);
3229         }
3230
3231         up_write(&osdc->lock);
3232         schedule_delayed_work(&osdc->osds_timeout_work,
3233                               round_jiffies_relative(delay));
3234 }
3235
3236 static int ceph_oloc_decode(void **p, void *end,
3237                             struct ceph_object_locator *oloc)
3238 {
3239         u8 struct_v, struct_cv;
3240         u32 len;
3241         void *struct_end;
3242         int ret = 0;
3243
3244         ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
3245         struct_v = ceph_decode_8(p);
3246         struct_cv = ceph_decode_8(p);
3247         if (struct_v < 3) {
3248                 pr_warn("got v %d < 3 cv %d of ceph_object_locator\n",
3249                         struct_v, struct_cv);
3250                 goto e_inval;
3251         }
3252         if (struct_cv > 6) {
3253                 pr_warn("got v %d cv %d > 6 of ceph_object_locator\n",
3254                         struct_v, struct_cv);
3255                 goto e_inval;
3256         }
3257         len = ceph_decode_32(p);
3258         ceph_decode_need(p, end, len, e_inval);
3259         struct_end = *p + len;
3260
3261         oloc->pool = ceph_decode_64(p);
3262         *p += 4; /* skip preferred */
3263
3264         len = ceph_decode_32(p);
3265         if (len > 0) {
3266                 pr_warn("ceph_object_locator::key is set\n");
3267                 goto e_inval;
3268         }
3269
3270         if (struct_v >= 5) {
3271                 bool changed = false;
3272
3273                 len = ceph_decode_32(p);
3274                 if (len > 0) {
3275                         ceph_decode_need(p, end, len, e_inval);
3276                         if (!oloc->pool_ns ||
3277                             ceph_compare_string(oloc->pool_ns, *p, len))
3278                                 changed = true;
3279                         *p += len;
3280                 } else {
3281                         if (oloc->pool_ns)
3282                                 changed = true;
3283                 }
3284                 if (changed) {
3285                         /* redirect changes namespace */
3286                         pr_warn("ceph_object_locator::nspace is changed\n");
3287                         goto e_inval;
3288                 }
3289         }
3290
3291         if (struct_v >= 6) {
3292                 s64 hash = ceph_decode_64(p);
3293                 if (hash != -1) {
3294                         pr_warn("ceph_object_locator::hash is set\n");
3295                         goto e_inval;
3296                 }
3297         }
3298
3299         /* skip the rest */
3300         *p = struct_end;
3301 out:
3302         return ret;
3303
3304 e_inval:
3305         ret = -EINVAL;
3306         goto out;
3307 }
3308
3309 static int ceph_redirect_decode(void **p, void *end,
3310                                 struct ceph_request_redirect *redir)
3311 {
3312         u8 struct_v, struct_cv;
3313         u32 len;
3314         void *struct_end;
3315         int ret;
3316
3317         ceph_decode_need(p, end, 1 + 1 + 4, e_inval);
3318         struct_v = ceph_decode_8(p);
3319         struct_cv = ceph_decode_8(p);
3320         if (struct_cv > 1) {
3321                 pr_warn("got v %d cv %d > 1 of ceph_request_redirect\n",
3322                         struct_v, struct_cv);
3323                 goto e_inval;
3324         }
3325         len = ceph_decode_32(p);
3326         ceph_decode_need(p, end, len, e_inval);
3327         struct_end = *p + len;
3328
3329         ret = ceph_oloc_decode(p, end, &redir->oloc);
3330         if (ret)
3331                 goto out;
3332
3333         len = ceph_decode_32(p);
3334         if (len > 0) {
3335                 pr_warn("ceph_request_redirect::object_name is set\n");
3336                 goto e_inval;
3337         }
3338
3339         len = ceph_decode_32(p);
3340         *p += len; /* skip osd_instructions */
3341
3342         /* skip the rest */
3343         *p = struct_end;
3344 out:
3345         return ret;
3346
3347 e_inval:
3348         ret = -EINVAL;
3349         goto out;
3350 }
3351
3352 struct MOSDOpReply {
3353         struct ceph_pg pgid;
3354         u64 flags;
3355         int result;
3356         u32 epoch;
3357         int num_ops;
3358         u32 outdata_len[CEPH_OSD_MAX_OPS];
3359         s32 rval[CEPH_OSD_MAX_OPS];
3360         int retry_attempt;
3361         struct ceph_eversion replay_version;
3362         u64 user_version;
3363         struct ceph_request_redirect redirect;
3364 };
3365
3366 static int decode_MOSDOpReply(const struct ceph_msg *msg, struct MOSDOpReply *m)
3367 {
3368         void *p = msg->front.iov_base;
3369         void *const end = p + msg->front.iov_len;
3370         u16 version = le16_to_cpu(msg->hdr.version);
3371         struct ceph_eversion bad_replay_version;
3372         u8 decode_redir;
3373         u32 len;
3374         int ret;
3375         int i;
3376
3377         ceph_decode_32_safe(&p, end, len, e_inval);
3378         ceph_decode_need(&p, end, len, e_inval);
3379         p += len; /* skip oid */
3380
3381         ret = ceph_decode_pgid(&p, end, &m->pgid);
3382         if (ret)
3383                 return ret;
3384
3385         ceph_decode_64_safe(&p, end, m->flags, e_inval);
3386         ceph_decode_32_safe(&p, end, m->result, e_inval);
3387         ceph_decode_need(&p, end, sizeof(bad_replay_version), e_inval);
3388         memcpy(&bad_replay_version, p, sizeof(bad_replay_version));
3389         p += sizeof(bad_replay_version);
3390         ceph_decode_32_safe(&p, end, m->epoch, e_inval);
3391
3392         ceph_decode_32_safe(&p, end, m->num_ops, e_inval);
3393         if (m->num_ops > ARRAY_SIZE(m->outdata_len))
3394                 goto e_inval;
3395
3396         ceph_decode_need(&p, end, m->num_ops * sizeof(struct ceph_osd_op),
3397                          e_inval);
3398         for (i = 0; i < m->num_ops; i++) {
3399                 struct ceph_osd_op *op = p;
3400
3401                 m->outdata_len[i] = le32_to_cpu(op->payload_len);
3402                 p += sizeof(*op);
3403         }
3404
3405         ceph_decode_32_safe(&p, end, m->retry_attempt, e_inval);
3406         for (i = 0; i < m->num_ops; i++)
3407                 ceph_decode_32_safe(&p, end, m->rval[i], e_inval);
3408
3409         if (version >= 5) {
3410                 ceph_decode_need(&p, end, sizeof(m->replay_version), e_inval);
3411                 memcpy(&m->replay_version, p, sizeof(m->replay_version));
3412                 p += sizeof(m->replay_version);
3413                 ceph_decode_64_safe(&p, end, m->user_version, e_inval);
3414         } else {
3415                 m->replay_version = bad_replay_version; /* struct */
3416                 m->user_version = le64_to_cpu(m->replay_version.version);
3417         }
3418
3419         if (version >= 6) {
3420                 if (version >= 7)
3421                         ceph_decode_8_safe(&p, end, decode_redir, e_inval);
3422                 else
3423                         decode_redir = 1;
3424         } else {
3425                 decode_redir = 0;
3426         }
3427
3428         if (decode_redir) {
3429                 ret = ceph_redirect_decode(&p, end, &m->redirect);
3430                 if (ret)
3431                         return ret;
3432         } else {
3433                 ceph_oloc_init(&m->redirect.oloc);
3434         }
3435
3436         return 0;
3437
3438 e_inval:
3439         return -EINVAL;
3440 }
3441
3442 /*
3443  * Handle MOSDOpReply.  Set ->r_result and call the callback if it is
3444  * specified.
3445  */
3446 static void handle_reply(struct ceph_osd *osd, struct ceph_msg *msg)
3447 {
3448         struct ceph_osd_client *osdc = osd->o_osdc;
3449         struct ceph_osd_request *req;
3450         struct MOSDOpReply m;
3451         u64 tid = le64_to_cpu(msg->hdr.tid);
3452         u32 data_len = 0;
3453         int ret;
3454         int i;
3455
3456         dout("%s msg %p tid %llu\n", __func__, msg, tid);
3457
3458         down_read(&osdc->lock);
3459         if (!osd_registered(osd)) {
3460                 dout("%s osd%d unknown\n", __func__, osd->o_osd);
3461                 goto out_unlock_osdc;
3462         }
3463         WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
3464
3465         mutex_lock(&osd->lock);
3466         req = lookup_request(&osd->o_requests, tid);
3467         if (!req) {
3468                 dout("%s osd%d tid %llu unknown\n", __func__, osd->o_osd, tid);
3469                 goto out_unlock_session;
3470         }
3471
3472         m.redirect.oloc.pool_ns = req->r_t.target_oloc.pool_ns;
3473         ret = decode_MOSDOpReply(msg, &m);
3474         m.redirect.oloc.pool_ns = NULL;
3475         if (ret) {
3476                 pr_err("failed to decode MOSDOpReply for tid %llu: %d\n",
3477                        req->r_tid, ret);
3478                 ceph_msg_dump(msg);
3479                 goto fail_request;
3480         }
3481         dout("%s req %p tid %llu flags 0x%llx pgid %llu.%x epoch %u attempt %d v %u'%llu uv %llu\n",
3482              __func__, req, req->r_tid, m.flags, m.pgid.pool, m.pgid.seed,
3483              m.epoch, m.retry_attempt, le32_to_cpu(m.replay_version.epoch),
3484              le64_to_cpu(m.replay_version.version), m.user_version);
3485
3486         if (m.retry_attempt >= 0) {
3487                 if (m.retry_attempt != req->r_attempts - 1) {
3488                         dout("req %p tid %llu retry_attempt %d != %d, ignoring\n",
3489                              req, req->r_tid, m.retry_attempt,
3490                              req->r_attempts - 1);
3491                         goto out_unlock_session;
3492                 }
3493         } else {
3494                 WARN_ON(1); /* MOSDOpReply v4 is assumed */
3495         }
3496
3497         if (!ceph_oloc_empty(&m.redirect.oloc)) {
3498                 dout("req %p tid %llu redirect pool %lld\n", req, req->r_tid,
3499                      m.redirect.oloc.pool);
3500                 unlink_request(osd, req);
3501                 mutex_unlock(&osd->lock);
3502
3503                 /*
3504                  * Not ceph_oloc_copy() - changing pool_ns is not
3505                  * supported.
3506                  */
3507                 req->r_t.target_oloc.pool = m.redirect.oloc.pool;
3508                 req->r_flags |= CEPH_OSD_FLAG_REDIRECTED;
3509                 req->r_tid = 0;
3510                 __submit_request(req, false);
3511                 goto out_unlock_osdc;
3512         }
3513
3514         if (m.num_ops != req->r_num_ops) {
3515                 pr_err("num_ops %d != %d for tid %llu\n", m.num_ops,
3516                        req->r_num_ops, req->r_tid);
3517                 goto fail_request;
3518         }
3519         for (i = 0; i < req->r_num_ops; i++) {
3520                 dout(" req %p tid %llu op %d rval %d len %u\n", req,
3521                      req->r_tid, i, m.rval[i], m.outdata_len[i]);
3522                 req->r_ops[i].rval = m.rval[i];
3523                 req->r_ops[i].outdata_len = m.outdata_len[i];
3524                 data_len += m.outdata_len[i];
3525         }
3526         if (data_len != le32_to_cpu(msg->hdr.data_len)) {
3527                 pr_err("sum of lens %u != %u for tid %llu\n", data_len,
3528                        le32_to_cpu(msg->hdr.data_len), req->r_tid);
3529                 goto fail_request;
3530         }
3531         dout("%s req %p tid %llu result %d data_len %u\n", __func__,
3532              req, req->r_tid, m.result, data_len);
3533
3534         /*
3535          * Since we only ever request ONDISK, we should only ever get
3536          * one (type of) reply back.
3537          */
3538         WARN_ON(!(m.flags & CEPH_OSD_FLAG_ONDISK));
3539         req->r_result = m.result ?: data_len;
3540         finish_request(req);
3541         mutex_unlock(&osd->lock);
3542         up_read(&osdc->lock);
3543
3544         __complete_request(req);
3545         complete_all(&req->r_completion);
3546         ceph_osdc_put_request(req);
3547         return;
3548
3549 fail_request:
3550         complete_request(req, -EIO);
3551 out_unlock_session:
3552         mutex_unlock(&osd->lock);
3553 out_unlock_osdc:
3554         up_read(&osdc->lock);
3555 }
3556
3557 static void set_pool_was_full(struct ceph_osd_client *osdc)
3558 {
3559         struct rb_node *n;
3560
3561         for (n = rb_first(&osdc->osdmap->pg_pools); n; n = rb_next(n)) {
3562                 struct ceph_pg_pool_info *pi =
3563                     rb_entry(n, struct ceph_pg_pool_info, node);
3564
3565                 pi->was_full = __pool_full(pi);
3566         }
3567 }
3568
3569 static bool pool_cleared_full(struct ceph_osd_client *osdc, s64 pool_id)
3570 {
3571         struct ceph_pg_pool_info *pi;
3572
3573         pi = ceph_pg_pool_by_id(osdc->osdmap, pool_id);
3574         if (!pi)
3575                 return false;
3576
3577         return pi->was_full && !__pool_full(pi);
3578 }
3579
3580 static enum calc_target_result
3581 recalc_linger_target(struct ceph_osd_linger_request *lreq)
3582 {
3583         struct ceph_osd_client *osdc = lreq->osdc;
3584         enum calc_target_result ct_res;
3585
3586         ct_res = calc_target(osdc, &lreq->t, NULL, true);
3587         if (ct_res == CALC_TARGET_NEED_RESEND) {
3588                 struct ceph_osd *osd;
3589
3590                 osd = lookup_create_osd(osdc, lreq->t.osd, true);
3591                 if (osd != lreq->osd) {
3592                         unlink_linger(lreq->osd, lreq);
3593                         link_linger(osd, lreq);
3594                 }
3595         }
3596
3597         return ct_res;
3598 }
3599
3600 /*
3601  * Requeue requests whose mapping to an OSD has changed.
3602  */
3603 static void scan_requests(struct ceph_osd *osd,
3604                           bool force_resend,
3605                           bool cleared_full,
3606                           bool check_pool_cleared_full,
3607                           struct rb_root *need_resend,
3608                           struct list_head *need_resend_linger)
3609 {
3610         struct ceph_osd_client *osdc = osd->o_osdc;
3611         struct rb_node *n;
3612         bool force_resend_writes;
3613
3614         for (n = rb_first(&osd->o_linger_requests); n; ) {
3615                 struct ceph_osd_linger_request *lreq =
3616                     rb_entry(n, struct ceph_osd_linger_request, node);
3617                 enum calc_target_result ct_res;
3618
3619                 n = rb_next(n); /* recalc_linger_target() */
3620
3621                 dout("%s lreq %p linger_id %llu\n", __func__, lreq,
3622                      lreq->linger_id);
3623                 ct_res = recalc_linger_target(lreq);
3624                 switch (ct_res) {
3625                 case CALC_TARGET_NO_ACTION:
3626                         force_resend_writes = cleared_full ||
3627                             (check_pool_cleared_full &&
3628                              pool_cleared_full(osdc, lreq->t.base_oloc.pool));
3629                         if (!force_resend && !force_resend_writes)
3630                                 break;
3631
3632                         /* fall through */
3633                 case CALC_TARGET_NEED_RESEND:
3634                         cancel_linger_map_check(lreq);
3635                         /*
3636                          * scan_requests() for the previous epoch(s)
3637                          * may have already added it to the list, since
3638                          * it's not unlinked here.
3639                          */
3640                         if (list_empty(&lreq->scan_item))
3641                                 list_add_tail(&lreq->scan_item, need_resend_linger);
3642                         break;
3643                 case CALC_TARGET_POOL_DNE:
3644                         list_del_init(&lreq->scan_item);
3645                         check_linger_pool_dne(lreq);
3646                         break;
3647                 }
3648         }
3649
3650         for (n = rb_first(&osd->o_requests); n; ) {
3651                 struct ceph_osd_request *req =
3652                     rb_entry(n, struct ceph_osd_request, r_node);
3653                 enum calc_target_result ct_res;
3654
3655                 n = rb_next(n); /* unlink_request(), check_pool_dne() */
3656
3657                 dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
3658                 ct_res = calc_target(osdc, &req->r_t, &req->r_osd->o_con,
3659                                      false);
3660                 switch (ct_res) {
3661                 case CALC_TARGET_NO_ACTION:
3662                         force_resend_writes = cleared_full ||
3663                             (check_pool_cleared_full &&
3664                              pool_cleared_full(osdc, req->r_t.base_oloc.pool));
3665                         if (!force_resend &&
3666                             (!(req->r_flags & CEPH_OSD_FLAG_WRITE) ||
3667                              !force_resend_writes))
3668                                 break;
3669
3670                         /* fall through */
3671                 case CALC_TARGET_NEED_RESEND:
3672                         cancel_map_check(req);
3673                         unlink_request(osd, req);
3674                         insert_request(need_resend, req);
3675                         break;
3676                 case CALC_TARGET_POOL_DNE:
3677                         check_pool_dne(req);
3678                         break;
3679                 }
3680         }
3681 }
3682
3683 static int handle_one_map(struct ceph_osd_client *osdc,
3684                           void *p, void *end, bool incremental,
3685                           struct rb_root *need_resend,
3686                           struct list_head *need_resend_linger)
3687 {
3688         struct ceph_osdmap *newmap;
3689         struct rb_node *n;
3690         bool skipped_map = false;
3691         bool was_full;
3692
3693         was_full = ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL);
3694         set_pool_was_full(osdc);
3695
3696         if (incremental)
3697                 newmap = osdmap_apply_incremental(&p, end, osdc->osdmap);
3698         else
3699                 newmap = ceph_osdmap_decode(&p, end);
3700         if (IS_ERR(newmap))
3701                 return PTR_ERR(newmap);
3702
3703         if (newmap != osdc->osdmap) {
3704                 /*
3705                  * Preserve ->was_full before destroying the old map.
3706                  * For pools that weren't in the old map, ->was_full
3707                  * should be false.
3708                  */
3709                 for (n = rb_first(&newmap->pg_pools); n; n = rb_next(n)) {
3710                         struct ceph_pg_pool_info *pi =
3711                             rb_entry(n, struct ceph_pg_pool_info, node);
3712                         struct ceph_pg_pool_info *old_pi;
3713
3714                         old_pi = ceph_pg_pool_by_id(osdc->osdmap, pi->id);
3715                         if (old_pi)
3716                                 pi->was_full = old_pi->was_full;
3717                         else
3718                                 WARN_ON(pi->was_full);
3719                 }
3720
3721                 if (osdc->osdmap->epoch &&
3722                     osdc->osdmap->epoch + 1 < newmap->epoch) {
3723                         WARN_ON(incremental);
3724                         skipped_map = true;
3725                 }
3726
3727                 ceph_osdmap_destroy(osdc->osdmap);
3728                 osdc->osdmap = newmap;
3729         }
3730
3731         was_full &= !ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL);
3732         scan_requests(&osdc->homeless_osd, skipped_map, was_full, true,
3733                       need_resend, need_resend_linger);
3734
3735         for (n = rb_first(&osdc->osds); n; ) {
3736                 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
3737
3738                 n = rb_next(n); /* close_osd() */
3739
3740                 scan_requests(osd, skipped_map, was_full, true, need_resend,
3741                               need_resend_linger);
3742                 if (!ceph_osd_is_up(osdc->osdmap, osd->o_osd) ||
3743                     memcmp(&osd->o_con.peer_addr,
3744                            ceph_osd_addr(osdc->osdmap, osd->o_osd),
3745                            sizeof(struct ceph_entity_addr)))
3746                         close_osd(osd);
3747         }
3748
3749         return 0;
3750 }
3751
3752 static void kick_requests(struct ceph_osd_client *osdc,
3753                           struct rb_root *need_resend,
3754                           struct list_head *need_resend_linger)
3755 {
3756         struct ceph_osd_linger_request *lreq, *nlreq;
3757         enum calc_target_result ct_res;
3758         struct rb_node *n;
3759
3760         /* make sure need_resend targets reflect latest map */
3761         for (n = rb_first(need_resend); n; ) {
3762                 struct ceph_osd_request *req =
3763                     rb_entry(n, struct ceph_osd_request, r_node);
3764
3765                 n = rb_next(n);
3766
3767                 if (req->r_t.epoch < osdc->osdmap->epoch) {
3768                         ct_res = calc_target(osdc, &req->r_t, NULL, false);
3769                         if (ct_res == CALC_TARGET_POOL_DNE) {
3770                                 erase_request(need_resend, req);
3771                                 check_pool_dne(req);
3772                         }
3773                 }
3774         }
3775
3776         for (n = rb_first(need_resend); n; ) {
3777                 struct ceph_osd_request *req =
3778                     rb_entry(n, struct ceph_osd_request, r_node);
3779                 struct ceph_osd *osd;
3780
3781                 n = rb_next(n);
3782                 erase_request(need_resend, req); /* before link_request() */
3783
3784                 osd = lookup_create_osd(osdc, req->r_t.osd, true);
3785                 link_request(osd, req);
3786                 if (!req->r_linger) {
3787                         if (!osd_homeless(osd) && !req->r_t.paused)
3788                                 send_request(req);
3789                 } else {
3790                         cancel_linger_request(req);
3791                 }
3792         }
3793
3794         list_for_each_entry_safe(lreq, nlreq, need_resend_linger, scan_item) {
3795                 if (!osd_homeless(lreq->osd))
3796                         send_linger(lreq);
3797
3798                 list_del_init(&lreq->scan_item);
3799         }
3800 }
3801
3802 /*
3803  * Process updated osd map.
3804  *
3805  * The message contains any number of incremental and full maps, normally
3806  * indicating some sort of topology change in the cluster.  Kick requests
3807  * off to different OSDs as needed.
3808  */
3809 void ceph_osdc_handle_map(struct ceph_osd_client *osdc, struct ceph_msg *msg)
3810 {
3811         void *p = msg->front.iov_base;
3812         void *const end = p + msg->front.iov_len;
3813         u32 nr_maps, maplen;
3814         u32 epoch;
3815         struct ceph_fsid fsid;
3816         struct rb_root need_resend = RB_ROOT;
3817         LIST_HEAD(need_resend_linger);
3818         bool handled_incremental = false;
3819         bool was_pauserd, was_pausewr;
3820         bool pauserd, pausewr;
3821         int err;
3822
3823         dout("%s have %u\n", __func__, osdc->osdmap->epoch);
3824         down_write(&osdc->lock);
3825
3826         /* verify fsid */
3827         ceph_decode_need(&p, end, sizeof(fsid), bad);
3828         ceph_decode_copy(&p, &fsid, sizeof(fsid));
3829         if (ceph_check_fsid(osdc->client, &fsid) < 0)
3830                 goto bad;
3831
3832         was_pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
3833         was_pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
3834                       ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
3835                       have_pool_full(osdc);
3836
3837         /* incremental maps */
3838         ceph_decode_32_safe(&p, end, nr_maps, bad);
3839         dout(" %d inc maps\n", nr_maps);
3840         while (nr_maps > 0) {
3841                 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
3842                 epoch = ceph_decode_32(&p);
3843                 maplen = ceph_decode_32(&p);
3844                 ceph_decode_need(&p, end, maplen, bad);
3845                 if (osdc->osdmap->epoch &&
3846                     osdc->osdmap->epoch + 1 == epoch) {
3847                         dout("applying incremental map %u len %d\n",
3848                              epoch, maplen);
3849                         err = handle_one_map(osdc, p, p + maplen, true,
3850                                              &need_resend, &need_resend_linger);
3851                         if (err)
3852                                 goto bad;
3853                         handled_incremental = true;
3854                 } else {
3855                         dout("ignoring incremental map %u len %d\n",
3856                              epoch, maplen);
3857                 }
3858                 p += maplen;
3859                 nr_maps--;
3860         }
3861         if (handled_incremental)
3862                 goto done;
3863
3864         /* full maps */
3865         ceph_decode_32_safe(&p, end, nr_maps, bad);
3866         dout(" %d full maps\n", nr_maps);
3867         while (nr_maps) {
3868                 ceph_decode_need(&p, end, 2*sizeof(u32), bad);
3869                 epoch = ceph_decode_32(&p);
3870                 maplen = ceph_decode_32(&p);
3871                 ceph_decode_need(&p, end, maplen, bad);
3872                 if (nr_maps > 1) {
3873                         dout("skipping non-latest full map %u len %d\n",
3874                              epoch, maplen);
3875                 } else if (osdc->osdmap->epoch >= epoch) {
3876                         dout("skipping full map %u len %d, "
3877                              "older than our %u\n", epoch, maplen,
3878                              osdc->osdmap->epoch);
3879                 } else {
3880                         dout("taking full map %u len %d\n", epoch, maplen);
3881                         err = handle_one_map(osdc, p, p + maplen, false,
3882                                              &need_resend, &need_resend_linger);
3883                         if (err)
3884                                 goto bad;
3885                 }
3886                 p += maplen;
3887                 nr_maps--;
3888         }
3889
3890 done:
3891         /*
3892          * subscribe to subsequent osdmap updates if full to ensure
3893          * we find out when we are no longer full and stop returning
3894          * ENOSPC.
3895          */
3896         pauserd = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSERD);
3897         pausewr = ceph_osdmap_flag(osdc, CEPH_OSDMAP_PAUSEWR) ||
3898                   ceph_osdmap_flag(osdc, CEPH_OSDMAP_FULL) ||
3899                   have_pool_full(osdc);
3900         if (was_pauserd || was_pausewr || pauserd || pausewr ||
3901             osdc->osdmap->epoch < osdc->epoch_barrier)
3902                 maybe_request_map(osdc);
3903
3904         kick_requests(osdc, &need_resend, &need_resend_linger);
3905
3906         ceph_osdc_abort_on_full(osdc);
3907         ceph_monc_got_map(&osdc->client->monc, CEPH_SUB_OSDMAP,
3908                           osdc->osdmap->epoch);
3909         up_write(&osdc->lock);
3910         wake_up_all(&osdc->client->auth_wq);
3911         return;
3912
3913 bad:
3914         pr_err("osdc handle_map corrupt msg\n");
3915         ceph_msg_dump(msg);
3916         up_write(&osdc->lock);
3917 }
3918
3919 /*
3920  * Resubmit requests pending on the given osd.
3921  */
3922 static void kick_osd_requests(struct ceph_osd *osd)
3923 {
3924         struct rb_node *n;
3925
3926         clear_backoffs(osd);
3927
3928         for (n = rb_first(&osd->o_requests); n; ) {
3929                 struct ceph_osd_request *req =
3930                     rb_entry(n, struct ceph_osd_request, r_node);
3931
3932                 n = rb_next(n); /* cancel_linger_request() */
3933
3934                 if (!req->r_linger) {
3935                         if (!req->r_t.paused)
3936                                 send_request(req);
3937                 } else {
3938                         cancel_linger_request(req);
3939                 }
3940         }
3941         for (n = rb_first(&osd->o_linger_requests); n; n = rb_next(n)) {
3942                 struct ceph_osd_linger_request *lreq =
3943                     rb_entry(n, struct ceph_osd_linger_request, node);
3944
3945                 send_linger(lreq);
3946         }
3947 }
3948
3949 /*
3950  * If the osd connection drops, we need to resubmit all requests.
3951  */
3952 static void osd_fault(struct ceph_connection *con)
3953 {
3954         struct ceph_osd *osd = con->private;
3955         struct ceph_osd_client *osdc = osd->o_osdc;
3956
3957         dout("%s osd %p osd%d\n", __func__, osd, osd->o_osd);
3958
3959         down_write(&osdc->lock);
3960         if (!osd_registered(osd)) {
3961                 dout("%s osd%d unknown\n", __func__, osd->o_osd);
3962                 goto out_unlock;
3963         }
3964
3965         if (!reopen_osd(osd))
3966                 kick_osd_requests(osd);
3967         maybe_request_map(osdc);
3968
3969 out_unlock:
3970         up_write(&osdc->lock);
3971 }
3972
3973 struct MOSDBackoff {
3974         struct ceph_spg spgid;
3975         u32 map_epoch;
3976         u8 op;
3977         u64 id;
3978         struct ceph_hobject_id *begin;
3979         struct ceph_hobject_id *end;
3980 };
3981
3982 static int decode_MOSDBackoff(const struct ceph_msg *msg, struct MOSDBackoff *m)
3983 {
3984         void *p = msg->front.iov_base;
3985         void *const end = p + msg->front.iov_len;
3986         u8 struct_v;
3987         u32 struct_len;
3988         int ret;
3989
3990         ret = ceph_start_decoding(&p, end, 1, "spg_t", &struct_v, &struct_len);
3991         if (ret)
3992                 return ret;
3993
3994         ret = ceph_decode_pgid(&p, end, &m->spgid.pgid);
3995         if (ret)
3996                 return ret;
3997
3998         ceph_decode_8_safe(&p, end, m->spgid.shard, e_inval);
3999         ceph_decode_32_safe(&p, end, m->map_epoch, e_inval);
4000         ceph_decode_8_safe(&p, end, m->op, e_inval);
4001         ceph_decode_64_safe(&p, end, m->id, e_inval);
4002
4003         m->begin = kzalloc(sizeof(*m->begin), GFP_NOIO);
4004         if (!m->begin)
4005                 return -ENOMEM;
4006
4007         ret = decode_hoid(&p, end, m->begin);
4008         if (ret) {
4009                 free_hoid(m->begin);
4010                 return ret;
4011         }
4012
4013         m->end = kzalloc(sizeof(*m->end), GFP_NOIO);
4014         if (!m->end) {
4015                 free_hoid(m->begin);
4016                 return -ENOMEM;
4017         }
4018
4019         ret = decode_hoid(&p, end, m->end);
4020         if (ret) {
4021                 free_hoid(m->begin);
4022                 free_hoid(m->end);
4023                 return ret;
4024         }
4025
4026         return 0;
4027
4028 e_inval:
4029         return -EINVAL;
4030 }
4031
4032 static struct ceph_msg *create_backoff_message(
4033                                 const struct ceph_osd_backoff *backoff,
4034                                 u32 map_epoch)
4035 {
4036         struct ceph_msg *msg;
4037         void *p, *end;
4038         int msg_size;
4039
4040         msg_size = CEPH_ENCODING_START_BLK_LEN +
4041                         CEPH_PGID_ENCODING_LEN + 1; /* spgid */
4042         msg_size += 4 + 1 + 8; /* map_epoch, op, id */
4043         msg_size += CEPH_ENCODING_START_BLK_LEN +
4044                         hoid_encoding_size(backoff->begin);
4045         msg_size += CEPH_ENCODING_START_BLK_LEN +
4046                         hoid_encoding_size(backoff->end);
4047
4048         msg = ceph_msg_new(CEPH_MSG_OSD_BACKOFF, msg_size, GFP_NOIO, true);
4049         if (!msg)
4050                 return NULL;
4051
4052         p = msg->front.iov_base;
4053         end = p + msg->front_alloc_len;
4054
4055         encode_spgid(&p, &backoff->spgid);
4056         ceph_encode_32(&p, map_epoch);
4057         ceph_encode_8(&p, CEPH_OSD_BACKOFF_OP_ACK_BLOCK);
4058         ceph_encode_64(&p, backoff->id);
4059         encode_hoid(&p, end, backoff->begin);
4060         encode_hoid(&p, end, backoff->end);
4061         BUG_ON(p != end);
4062
4063         msg->front.iov_len = p - msg->front.iov_base;
4064         msg->hdr.version = cpu_to_le16(1); /* MOSDBackoff v1 */
4065         msg->hdr.front_len = cpu_to_le32(msg->front.iov_len);
4066
4067         return msg;
4068 }
4069
4070 static void handle_backoff_block(struct ceph_osd *osd, struct MOSDBackoff *m)
4071 {
4072         struct ceph_spg_mapping *spg;
4073         struct ceph_osd_backoff *backoff;
4074         struct ceph_msg *msg;
4075
4076         dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
4077              m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
4078
4079         spg = lookup_spg_mapping(&osd->o_backoff_mappings, &m->spgid);
4080         if (!spg) {
4081                 spg = alloc_spg_mapping();
4082                 if (!spg) {
4083                         pr_err("%s failed to allocate spg\n", __func__);
4084                         return;
4085                 }
4086                 spg->spgid = m->spgid; /* struct */
4087                 insert_spg_mapping(&osd->o_backoff_mappings, spg);
4088         }
4089
4090         backoff = alloc_backoff();
4091         if (!backoff) {
4092                 pr_err("%s failed to allocate backoff\n", __func__);
4093                 return;
4094         }
4095         backoff->spgid = m->spgid; /* struct */
4096         backoff->id = m->id;
4097         backoff->begin = m->begin;
4098         m->begin = NULL; /* backoff now owns this */
4099         backoff->end = m->end;
4100         m->end = NULL;   /* ditto */
4101
4102         insert_backoff(&spg->backoffs, backoff);
4103         insert_backoff_by_id(&osd->o_backoffs_by_id, backoff);
4104
4105         /*
4106          * Ack with original backoff's epoch so that the OSD can
4107          * discard this if there was a PG split.
4108          */
4109         msg = create_backoff_message(backoff, m->map_epoch);
4110         if (!msg) {
4111                 pr_err("%s failed to allocate msg\n", __func__);
4112                 return;
4113         }
4114         ceph_con_send(&osd->o_con, msg);
4115 }
4116
4117 static bool target_contained_by(const struct ceph_osd_request_target *t,
4118                                 const struct ceph_hobject_id *begin,
4119                                 const struct ceph_hobject_id *end)
4120 {
4121         struct ceph_hobject_id hoid;
4122         int cmp;
4123
4124         hoid_fill_from_target(&hoid, t);
4125         cmp = hoid_compare(&hoid, begin);
4126         return !cmp || (cmp > 0 && hoid_compare(&hoid, end) < 0);
4127 }
4128
4129 static void handle_backoff_unblock(struct ceph_osd *osd,
4130                                    const struct MOSDBackoff *m)
4131 {
4132         struct ceph_spg_mapping *spg;
4133         struct ceph_osd_backoff *backoff;
4134         struct rb_node *n;
4135
4136         dout("%s osd%d spgid %llu.%xs%d id %llu\n", __func__, osd->o_osd,
4137              m->spgid.pgid.pool, m->spgid.pgid.seed, m->spgid.shard, m->id);
4138
4139         backoff = lookup_backoff_by_id(&osd->o_backoffs_by_id, m->id);
4140         if (!backoff) {
4141                 pr_err("%s osd%d spgid %llu.%xs%d id %llu backoff dne\n",
4142                        __func__, osd->o_osd, m->spgid.pgid.pool,
4143                        m->spgid.pgid.seed, m->spgid.shard, m->id);
4144                 return;
4145         }
4146
4147         if (hoid_compare(backoff->begin, m->begin) &&
4148             hoid_compare(backoff->end, m->end)) {
4149                 pr_err("%s osd%d spgid %llu.%xs%d id %llu bad range?\n",
4150                        __func__, osd->o_osd, m->spgid.pgid.pool,
4151                        m->spgid.pgid.seed, m->spgid.shard, m->id);
4152                 /* unblock it anyway... */
4153         }
4154
4155         spg = lookup_spg_mapping(&osd->o_backoff_mappings, &backoff->spgid);
4156         BUG_ON(!spg);
4157
4158         erase_backoff(&spg->backoffs, backoff);
4159         erase_backoff_by_id(&osd->o_backoffs_by_id, backoff);
4160         free_backoff(backoff);
4161
4162         if (RB_EMPTY_ROOT(&spg->backoffs)) {
4163                 erase_spg_mapping(&osd->o_backoff_mappings, spg);
4164                 free_spg_mapping(spg);
4165         }
4166
4167         for (n = rb_first(&osd->o_requests); n; n = rb_next(n)) {
4168                 struct ceph_osd_request *req =
4169                     rb_entry(n, struct ceph_osd_request, r_node);
4170
4171                 if (!ceph_spg_compare(&req->r_t.spgid, &m->spgid)) {
4172                         /*
4173                          * Match against @m, not @backoff -- the PG may
4174                          * have split on the OSD.
4175                          */
4176                         if (target_contained_by(&req->r_t, m->begin, m->end)) {
4177                                 /*
4178                                  * If no other installed backoff applies,
4179                                  * resend.
4180                                  */
4181                                 send_request(req);
4182                         }
4183                 }
4184         }
4185 }
4186
4187 static void handle_backoff(struct ceph_osd *osd, struct ceph_msg *msg)
4188 {
4189         struct ceph_osd_client *osdc = osd->o_osdc;
4190         struct MOSDBackoff m;
4191         int ret;
4192
4193         down_read(&osdc->lock);
4194         if (!osd_registered(osd)) {
4195                 dout("%s osd%d unknown\n", __func__, osd->o_osd);
4196                 up_read(&osdc->lock);
4197                 return;
4198         }
4199         WARN_ON(osd->o_osd != le64_to_cpu(msg->hdr.src.num));
4200
4201         mutex_lock(&osd->lock);
4202         ret = decode_MOSDBackoff(msg, &m);
4203         if (ret) {
4204                 pr_err("failed to decode MOSDBackoff: %d\n", ret);
4205                 ceph_msg_dump(msg);
4206                 goto out_unlock;
4207         }
4208
4209         switch (m.op) {
4210         case CEPH_OSD_BACKOFF_OP_BLOCK:
4211                 handle_backoff_block(osd, &m);
4212                 break;
4213         case CEPH_OSD_BACKOFF_OP_UNBLOCK:
4214                 handle_backoff_unblock(osd, &m);
4215                 break;
4216         default:
4217                 pr_err("%s osd%d unknown op %d\n", __func__, osd->o_osd, m.op);
4218         }
4219
4220         free_hoid(m.begin);
4221         free_hoid(m.end);
4222
4223 out_unlock:
4224         mutex_unlock(&osd->lock);
4225         up_read(&osdc->lock);
4226 }
4227
4228 /*
4229  * Process osd watch notifications
4230  */
4231 static void handle_watch_notify(struct ceph_osd_client *osdc,
4232                                 struct ceph_msg *msg)
4233 {
4234         void *p = msg->front.iov_base;
4235         void *const end = p + msg->front.iov_len;
4236         struct ceph_osd_linger_request *lreq;
4237         struct linger_work *lwork;
4238         u8 proto_ver, opcode;
4239         u64 cookie, notify_id;
4240         u64 notifier_id = 0;
4241         s32 return_code = 0;
4242         void *payload = NULL;
4243         u32 payload_len = 0;
4244
4245         ceph_decode_8_safe(&p, end, proto_ver, bad);
4246         ceph_decode_8_safe(&p, end, opcode, bad);
4247         ceph_decode_64_safe(&p, end, cookie, bad);
4248         p += 8; /* skip ver */
4249         ceph_decode_64_safe(&p, end, notify_id, bad);
4250
4251         if (proto_ver >= 1) {
4252                 ceph_decode_32_safe(&p, end, payload_len, bad);
4253                 ceph_decode_need(&p, end, payload_len, bad);
4254                 payload = p;
4255                 p += payload_len;
4256         }
4257
4258         if (le16_to_cpu(msg->hdr.version) >= 2)
4259                 ceph_decode_32_safe(&p, end, return_code, bad);
4260
4261         if (le16_to_cpu(msg->hdr.version) >= 3)
4262                 ceph_decode_64_safe(&p, end, notifier_id, bad);
4263
4264         down_read(&osdc->lock);
4265         lreq = lookup_linger_osdc(&osdc->linger_requests, cookie);
4266         if (!lreq) {
4267                 dout("%s opcode %d cookie %llu dne\n", __func__, opcode,
4268                      cookie);
4269                 goto out_unlock_osdc;
4270         }
4271
4272         mutex_lock(&lreq->lock);
4273         dout("%s opcode %d cookie %llu lreq %p is_watch %d\n", __func__,
4274              opcode, cookie, lreq, lreq->is_watch);
4275         if (opcode == CEPH_WATCH_EVENT_DISCONNECT) {
4276                 if (!lreq->last_error) {
4277                         lreq->last_error = -ENOTCONN;
4278                         queue_watch_error(lreq);
4279                 }
4280         } else if (!lreq->is_watch) {
4281                 /* CEPH_WATCH_EVENT_NOTIFY_COMPLETE */
4282                 if (lreq->notify_id && lreq->notify_id != notify_id) {
4283                         dout("lreq %p notify_id %llu != %llu, ignoring\n", lreq,
4284                              lreq->notify_id, notify_id);
4285                 } else if (!completion_done(&lreq->notify_finish_wait)) {
4286                         struct ceph_msg_data *data =
4287                             list_first_entry_or_null(&msg->data,
4288                                                      struct ceph_msg_data,
4289                                                      links);
4290
4291                         if (data) {
4292                                 if (lreq->preply_pages) {
4293                                         WARN_ON(data->type !=
4294                                                         CEPH_MSG_DATA_PAGES);
4295                                         *lreq->preply_pages = data->pages;
4296                                         *lreq->preply_len = data->length;
4297                                 } else {
4298                                         ceph_release_page_vector(data->pages,
4299                                                calc_pages_for(0, data->length));
4300                                 }
4301                         }
4302                         lreq->notify_finish_error = return_code;
4303                         complete_all(&lreq->notify_finish_wait);
4304                 }
4305         } else {
4306                 /* CEPH_WATCH_EVENT_NOTIFY */
4307                 lwork = lwork_alloc(lreq, do_watch_notify);
4308                 if (!lwork) {
4309                         pr_err("failed to allocate notify-lwork\n");
4310                         goto out_unlock_lreq;
4311                 }
4312
4313                 lwork->notify.notify_id = notify_id;
4314                 lwork->notify.notifier_id = notifier_id;
4315                 lwork->notify.payload = payload;
4316                 lwork->notify.payload_len = payload_len;
4317                 lwork->notify.msg = ceph_msg_get(msg);
4318                 lwork_queue(lwork);
4319         }
4320
4321 out_unlock_lreq:
4322         mutex_unlock(&lreq->lock);
4323 out_unlock_osdc:
4324         up_read(&osdc->lock);
4325         return;
4326
4327 bad:
4328         pr_err("osdc handle_watch_notify corrupt msg\n");
4329 }
4330
4331 /*
4332  * Register request, send initial attempt.
4333  */
4334 int ceph_osdc_start_request(struct ceph_osd_client *osdc,
4335                             struct ceph_osd_request *req,
4336                             bool nofail)
4337 {
4338         down_read(&osdc->lock);
4339         submit_request(req, false);
4340         up_read(&osdc->lock);
4341
4342         return 0;
4343 }
4344 EXPORT_SYMBOL(ceph_osdc_start_request);
4345
4346 /*
4347  * Unregister a registered request.  The request is not completed:
4348  * ->r_result isn't set and __complete_request() isn't called.
4349  */
4350 void ceph_osdc_cancel_request(struct ceph_osd_request *req)
4351 {
4352         struct ceph_osd_client *osdc = req->r_osdc;
4353
4354         down_write(&osdc->lock);
4355         if (req->r_osd)
4356                 cancel_request(req);
4357         up_write(&osdc->lock);
4358 }
4359 EXPORT_SYMBOL(ceph_osdc_cancel_request);
4360
4361 /*
4362  * @timeout: in jiffies, 0 means "wait forever"
4363  */
4364 static int wait_request_timeout(struct ceph_osd_request *req,
4365                                 unsigned long timeout)
4366 {
4367         long left;
4368
4369         dout("%s req %p tid %llu\n", __func__, req, req->r_tid);
4370         left = wait_for_completion_killable_timeout(&req->r_completion,
4371                                                 ceph_timeout_jiffies(timeout));
4372         if (left <= 0) {
4373                 left = left ?: -ETIMEDOUT;
4374                 ceph_osdc_cancel_request(req);
4375         } else {
4376                 left = req->r_result; /* completed */
4377         }
4378
4379         return left;
4380 }
4381
4382 /*
4383  * wait for a request to complete
4384  */
4385 int ceph_osdc_wait_request(struct ceph_osd_client *osdc,
4386                            struct ceph_osd_request *req)
4387 {
4388         return wait_request_timeout(req, 0);
4389 }
4390 EXPORT_SYMBOL(ceph_osdc_wait_request);
4391
4392 /*
4393  * sync - wait for all in-flight requests to flush.  avoid starvation.
4394  */
4395 void ceph_osdc_sync(struct ceph_osd_client *osdc)
4396 {
4397         struct rb_node *n, *p;
4398         u64 last_tid = atomic64_read(&osdc->last_tid);
4399
4400 again:
4401         down_read(&osdc->lock);
4402         for (n = rb_first(&osdc->osds); n; n = rb_next(n)) {
4403                 struct ceph_osd *osd = rb_entry(n, struct ceph_osd, o_node);
4404
4405                 mutex_lock(&osd->lock);
4406                 for (p = rb_first(&osd->o_requests); p; p = rb_next(p)) {
4407                         struct ceph_osd_request *req =
4408                             rb_entry(p, struct ceph_osd_request, r_node);
4409
4410                         if (req->r_tid > last_tid)
4411                                 break;
4412
4413                         if (!(req->r_flags & CEPH_OSD_FLAG_WRITE))
4414                                 continue;
4415
4416                         ceph_osdc_get_request(req);
4417                         mutex_unlock(&osd->lock);
4418                         up_read(&osdc->lock);
4419                         dout("%s waiting on req %p tid %llu last_tid %llu\n",
4420                              __func__, req, req->r_tid, last_tid);
4421                         wait_for_completion(&req->r_completion);
4422                         ceph_osdc_put_request(req);
4423                         goto again;
4424                 }
4425
4426                 mutex_unlock(&osd->lock);
4427         }
4428
4429         up_read(&osdc->lock);
4430         dout("%s done last_tid %llu\n", __func__, last_tid);
4431 }
4432 EXPORT_SYMBOL(ceph_osdc_sync);
4433
4434 static struct ceph_osd_request *
4435 alloc_linger_request(struct ceph_osd_linger_request *lreq)
4436 {
4437         struct ceph_osd_request *req;
4438
4439         req = ceph_osdc_alloc_request(lreq->osdc, NULL, 1, false, GFP_NOIO);
4440         if (!req)
4441                 return NULL;
4442
4443         ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
4444         ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
4445
4446         if (ceph_osdc_alloc_messages(req, GFP_NOIO)) {
4447                 ceph_osdc_put_request(req);
4448                 return NULL;
4449         }
4450
4451         return req;
4452 }
4453
4454 /*
4455  * Returns a handle, caller owns a ref.
4456  */
4457 struct ceph_osd_linger_request *
4458 ceph_osdc_watch(struct ceph_osd_client *osdc,
4459                 struct ceph_object_id *oid,
4460                 struct ceph_object_locator *oloc,
4461                 rados_watchcb2_t wcb,
4462                 rados_watcherrcb_t errcb,
4463                 void *data)
4464 {
4465         struct ceph_osd_linger_request *lreq;
4466         int ret;
4467
4468         lreq = linger_alloc(osdc);
4469         if (!lreq)
4470                 return ERR_PTR(-ENOMEM);
4471
4472         lreq->is_watch = true;
4473         lreq->wcb = wcb;
4474         lreq->errcb = errcb;
4475         lreq->data = data;
4476         lreq->watch_valid_thru = jiffies;
4477
4478         ceph_oid_copy(&lreq->t.base_oid, oid);
4479         ceph_oloc_copy(&lreq->t.base_oloc, oloc);
4480         lreq->t.flags = CEPH_OSD_FLAG_WRITE;
4481         ktime_get_real_ts(&lreq->mtime);
4482
4483         lreq->reg_req = alloc_linger_request(lreq);
4484         if (!lreq->reg_req) {
4485                 ret = -ENOMEM;
4486                 goto err_put_lreq;
4487         }
4488
4489         lreq->ping_req = alloc_linger_request(lreq);
4490         if (!lreq->ping_req) {
4491                 ret = -ENOMEM;
4492                 goto err_put_lreq;
4493         }
4494
4495         down_write(&osdc->lock);
4496         linger_register(lreq); /* before osd_req_op_* */
4497         osd_req_op_watch_init(lreq->reg_req, 0, lreq->linger_id,
4498                               CEPH_OSD_WATCH_OP_WATCH);
4499         osd_req_op_watch_init(lreq->ping_req, 0, lreq->linger_id,
4500                               CEPH_OSD_WATCH_OP_PING);
4501         linger_submit(lreq);
4502         up_write(&osdc->lock);
4503
4504         ret = linger_reg_commit_wait(lreq);
4505         if (ret) {
4506                 linger_cancel(lreq);
4507                 goto err_put_lreq;
4508         }
4509
4510         return lreq;
4511
4512 err_put_lreq:
4513         linger_put(lreq);
4514         return ERR_PTR(ret);
4515 }
4516 EXPORT_SYMBOL(ceph_osdc_watch);
4517
4518 /*
4519  * Releases a ref.
4520  *
4521  * Times out after mount_timeout to preserve rbd unmap behaviour
4522  * introduced in 2894e1d76974 ("rbd: timeout watch teardown on unmap
4523  * with mount_timeout").
4524  */
4525 int ceph_osdc_unwatch(struct ceph_osd_client *osdc,
4526                       struct ceph_osd_linger_request *lreq)
4527 {
4528         struct ceph_options *opts = osdc->client->options;
4529         struct ceph_osd_request *req;
4530         int ret;
4531
4532         req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4533         if (!req)
4534                 return -ENOMEM;
4535
4536         ceph_oid_copy(&req->r_base_oid, &lreq->t.base_oid);
4537         ceph_oloc_copy(&req->r_base_oloc, &lreq->t.base_oloc);
4538         req->r_flags = CEPH_OSD_FLAG_WRITE;
4539         ktime_get_real_ts(&req->r_mtime);
4540         osd_req_op_watch_init(req, 0, lreq->linger_id,
4541                               CEPH_OSD_WATCH_OP_UNWATCH);
4542
4543         ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4544         if (ret)
4545                 goto out_put_req;
4546
4547         ceph_osdc_start_request(osdc, req, false);
4548         linger_cancel(lreq);
4549         linger_put(lreq);
4550         ret = wait_request_timeout(req, opts->mount_timeout);
4551
4552 out_put_req:
4553         ceph_osdc_put_request(req);
4554         return ret;
4555 }
4556 EXPORT_SYMBOL(ceph_osdc_unwatch);
4557
4558 static int osd_req_op_notify_ack_init(struct ceph_osd_request *req, int which,
4559                                       u64 notify_id, u64 cookie, void *payload,
4560                                       size_t payload_len)
4561 {
4562         struct ceph_osd_req_op *op;
4563         struct ceph_pagelist *pl;
4564         int ret;
4565
4566         op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY_ACK, 0);
4567
4568         pl = kmalloc(sizeof(*pl), GFP_NOIO);
4569         if (!pl)
4570                 return -ENOMEM;
4571
4572         ceph_pagelist_init(pl);
4573         ret = ceph_pagelist_encode_64(pl, notify_id);
4574         ret |= ceph_pagelist_encode_64(pl, cookie);
4575         if (payload) {
4576                 ret |= ceph_pagelist_encode_32(pl, payload_len);
4577                 ret |= ceph_pagelist_append(pl, payload, payload_len);
4578         } else {
4579                 ret |= ceph_pagelist_encode_32(pl, 0);
4580         }
4581         if (ret) {
4582                 ceph_pagelist_release(pl);
4583                 return -ENOMEM;
4584         }
4585
4586         ceph_osd_data_pagelist_init(&op->notify_ack.request_data, pl);
4587         op->indata_len = pl->length;
4588         return 0;
4589 }
4590
4591 int ceph_osdc_notify_ack(struct ceph_osd_client *osdc,
4592                          struct ceph_object_id *oid,
4593                          struct ceph_object_locator *oloc,
4594                          u64 notify_id,
4595                          u64 cookie,
4596                          void *payload,
4597                          size_t payload_len)
4598 {
4599         struct ceph_osd_request *req;
4600         int ret;
4601
4602         req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4603         if (!req)
4604                 return -ENOMEM;
4605
4606         ceph_oid_copy(&req->r_base_oid, oid);
4607         ceph_oloc_copy(&req->r_base_oloc, oloc);
4608         req->r_flags = CEPH_OSD_FLAG_READ;
4609
4610         ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4611         if (ret)
4612                 goto out_put_req;
4613
4614         ret = osd_req_op_notify_ack_init(req, 0, notify_id, cookie, payload,
4615                                          payload_len);
4616         if (ret)
4617                 goto out_put_req;
4618
4619         ceph_osdc_start_request(osdc, req, false);
4620         ret = ceph_osdc_wait_request(osdc, req);
4621
4622 out_put_req:
4623         ceph_osdc_put_request(req);
4624         return ret;
4625 }
4626 EXPORT_SYMBOL(ceph_osdc_notify_ack);
4627
4628 static int osd_req_op_notify_init(struct ceph_osd_request *req, int which,
4629                                   u64 cookie, u32 prot_ver, u32 timeout,
4630                                   void *payload, size_t payload_len)
4631 {
4632         struct ceph_osd_req_op *op;
4633         struct ceph_pagelist *pl;
4634         int ret;
4635
4636         op = _osd_req_op_init(req, which, CEPH_OSD_OP_NOTIFY, 0);
4637         op->notify.cookie = cookie;
4638
4639         pl = kmalloc(sizeof(*pl), GFP_NOIO);
4640         if (!pl)
4641                 return -ENOMEM;
4642
4643         ceph_pagelist_init(pl);
4644         ret = ceph_pagelist_encode_32(pl, 1); /* prot_ver */
4645         ret |= ceph_pagelist_encode_32(pl, timeout);
4646         ret |= ceph_pagelist_encode_32(pl, payload_len);
4647         ret |= ceph_pagelist_append(pl, payload, payload_len);
4648         if (ret) {
4649                 ceph_pagelist_release(pl);
4650                 return -ENOMEM;
4651         }
4652
4653         ceph_osd_data_pagelist_init(&op->notify.request_data, pl);
4654         op->indata_len = pl->length;
4655         return 0;
4656 }
4657
4658 /*
4659  * @timeout: in seconds
4660  *
4661  * @preply_{pages,len} are initialized both on success and error.
4662  * The caller is responsible for:
4663  *
4664  *     ceph_release_page_vector(reply_pages, calc_pages_for(0, reply_len))
4665  */
4666 int ceph_osdc_notify(struct ceph_osd_client *osdc,
4667                      struct ceph_object_id *oid,
4668                      struct ceph_object_locator *oloc,
4669                      void *payload,
4670                      size_t payload_len,
4671                      u32 timeout,
4672                      struct page ***preply_pages,
4673                      size_t *preply_len)
4674 {
4675         struct ceph_osd_linger_request *lreq;
4676         struct page **pages;
4677         int ret;
4678
4679         WARN_ON(!timeout);
4680         if (preply_pages) {
4681                 *preply_pages = NULL;
4682                 *preply_len = 0;
4683         }
4684
4685         lreq = linger_alloc(osdc);
4686         if (!lreq)
4687                 return -ENOMEM;
4688
4689         lreq->preply_pages = preply_pages;
4690         lreq->preply_len = preply_len;
4691
4692         ceph_oid_copy(&lreq->t.base_oid, oid);
4693         ceph_oloc_copy(&lreq->t.base_oloc, oloc);
4694         lreq->t.flags = CEPH_OSD_FLAG_READ;
4695
4696         lreq->reg_req = alloc_linger_request(lreq);
4697         if (!lreq->reg_req) {
4698                 ret = -ENOMEM;
4699                 goto out_put_lreq;
4700         }
4701
4702         /* for notify_id */
4703         pages = ceph_alloc_page_vector(1, GFP_NOIO);
4704         if (IS_ERR(pages)) {
4705                 ret = PTR_ERR(pages);
4706                 goto out_put_lreq;
4707         }
4708
4709         down_write(&osdc->lock);
4710         linger_register(lreq); /* before osd_req_op_* */
4711         ret = osd_req_op_notify_init(lreq->reg_req, 0, lreq->linger_id, 1,
4712                                      timeout, payload, payload_len);
4713         if (ret) {
4714                 linger_unregister(lreq);
4715                 up_write(&osdc->lock);
4716                 ceph_release_page_vector(pages, 1);
4717                 goto out_put_lreq;
4718         }
4719         ceph_osd_data_pages_init(osd_req_op_data(lreq->reg_req, 0, notify,
4720                                                  response_data),
4721                                  pages, PAGE_SIZE, 0, false, true);
4722         linger_submit(lreq);
4723         up_write(&osdc->lock);
4724
4725         ret = linger_reg_commit_wait(lreq);
4726         if (!ret)
4727                 ret = linger_notify_finish_wait(lreq);
4728         else
4729                 dout("lreq %p failed to initiate notify %d\n", lreq, ret);
4730
4731         linger_cancel(lreq);
4732 out_put_lreq:
4733         linger_put(lreq);
4734         return ret;
4735 }
4736 EXPORT_SYMBOL(ceph_osdc_notify);
4737
4738 /*
4739  * Return the number of milliseconds since the watch was last
4740  * confirmed, or an error.  If there is an error, the watch is no
4741  * longer valid, and should be destroyed with ceph_osdc_unwatch().
4742  */
4743 int ceph_osdc_watch_check(struct ceph_osd_client *osdc,
4744                           struct ceph_osd_linger_request *lreq)
4745 {
4746         unsigned long stamp, age;
4747         int ret;
4748
4749         down_read(&osdc->lock);
4750         mutex_lock(&lreq->lock);
4751         stamp = lreq->watch_valid_thru;
4752         if (!list_empty(&lreq->pending_lworks)) {
4753                 struct linger_work *lwork =
4754                     list_first_entry(&lreq->pending_lworks,
4755                                      struct linger_work,
4756                                      pending_item);
4757
4758                 if (time_before(lwork->queued_stamp, stamp))
4759                         stamp = lwork->queued_stamp;
4760         }
4761         age = jiffies - stamp;
4762         dout("%s lreq %p linger_id %llu age %lu last_error %d\n", __func__,
4763              lreq, lreq->linger_id, age, lreq->last_error);
4764         /* we are truncating to msecs, so return a safe upper bound */
4765         ret = lreq->last_error ?: 1 + jiffies_to_msecs(age);
4766
4767         mutex_unlock(&lreq->lock);
4768         up_read(&osdc->lock);
4769         return ret;
4770 }
4771
4772 static int decode_watcher(void **p, void *end, struct ceph_watch_item *item)
4773 {
4774         u8 struct_v;
4775         u32 struct_len;
4776         int ret;
4777
4778         ret = ceph_start_decoding(p, end, 2, "watch_item_t",
4779                                   &struct_v, &struct_len);
4780         if (ret)
4781                 return ret;
4782
4783         ceph_decode_copy(p, &item->name, sizeof(item->name));
4784         item->cookie = ceph_decode_64(p);
4785         *p += 4; /* skip timeout_seconds */
4786         if (struct_v >= 2) {
4787                 ceph_decode_copy(p, &item->addr, sizeof(item->addr));
4788                 ceph_decode_addr(&item->addr);
4789         }
4790
4791         dout("%s %s%llu cookie %llu addr %s\n", __func__,
4792              ENTITY_NAME(item->name), item->cookie,
4793              ceph_pr_addr(&item->addr.in_addr));
4794         return 0;
4795 }
4796
4797 static int decode_watchers(void **p, void *end,
4798                            struct ceph_watch_item **watchers,
4799                            u32 *num_watchers)
4800 {
4801         u8 struct_v;
4802         u32 struct_len;
4803         int i;
4804         int ret;
4805
4806         ret = ceph_start_decoding(p, end, 1, "obj_list_watch_response_t",
4807                                   &struct_v, &struct_len);
4808         if (ret)
4809                 return ret;
4810
4811         *num_watchers = ceph_decode_32(p);
4812         *watchers = kcalloc(*num_watchers, sizeof(**watchers), GFP_NOIO);
4813         if (!*watchers)
4814                 return -ENOMEM;
4815
4816         for (i = 0; i < *num_watchers; i++) {
4817                 ret = decode_watcher(p, end, *watchers + i);
4818                 if (ret) {
4819                         kfree(*watchers);
4820                         return ret;
4821                 }
4822         }
4823
4824         return 0;
4825 }
4826
4827 /*
4828  * On success, the caller is responsible for:
4829  *
4830  *     kfree(watchers);
4831  */
4832 int ceph_osdc_list_watchers(struct ceph_osd_client *osdc,
4833                             struct ceph_object_id *oid,
4834                             struct ceph_object_locator *oloc,
4835                             struct ceph_watch_item **watchers,
4836                             u32 *num_watchers)
4837 {
4838         struct ceph_osd_request *req;
4839         struct page **pages;
4840         int ret;
4841
4842         req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4843         if (!req)
4844                 return -ENOMEM;
4845
4846         ceph_oid_copy(&req->r_base_oid, oid);
4847         ceph_oloc_copy(&req->r_base_oloc, oloc);
4848         req->r_flags = CEPH_OSD_FLAG_READ;
4849
4850         ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4851         if (ret)
4852                 goto out_put_req;
4853
4854         pages = ceph_alloc_page_vector(1, GFP_NOIO);
4855         if (IS_ERR(pages)) {
4856                 ret = PTR_ERR(pages);
4857                 goto out_put_req;
4858         }
4859
4860         osd_req_op_init(req, 0, CEPH_OSD_OP_LIST_WATCHERS, 0);
4861         ceph_osd_data_pages_init(osd_req_op_data(req, 0, list_watchers,
4862                                                  response_data),
4863                                  pages, PAGE_SIZE, 0, false, true);
4864
4865         ceph_osdc_start_request(osdc, req, false);
4866         ret = ceph_osdc_wait_request(osdc, req);
4867         if (ret >= 0) {
4868                 void *p = page_address(pages[0]);
4869                 void *const end = p + req->r_ops[0].outdata_len;
4870
4871                 ret = decode_watchers(&p, end, watchers, num_watchers);
4872         }
4873
4874 out_put_req:
4875         ceph_osdc_put_request(req);
4876         return ret;
4877 }
4878 EXPORT_SYMBOL(ceph_osdc_list_watchers);
4879
4880 /*
4881  * Call all pending notify callbacks - for use after a watch is
4882  * unregistered, to make sure no more callbacks for it will be invoked
4883  */
4884 void ceph_osdc_flush_notifies(struct ceph_osd_client *osdc)
4885 {
4886         dout("%s osdc %p\n", __func__, osdc);
4887         flush_workqueue(osdc->notify_wq);
4888 }
4889 EXPORT_SYMBOL(ceph_osdc_flush_notifies);
4890
4891 void ceph_osdc_maybe_request_map(struct ceph_osd_client *osdc)
4892 {
4893         down_read(&osdc->lock);
4894         maybe_request_map(osdc);
4895         up_read(&osdc->lock);
4896 }
4897 EXPORT_SYMBOL(ceph_osdc_maybe_request_map);
4898
4899 /*
4900  * Execute an OSD class method on an object.
4901  *
4902  * @flags: CEPH_OSD_FLAG_*
4903  * @resp_len: in/out param for reply length
4904  */
4905 int ceph_osdc_call(struct ceph_osd_client *osdc,
4906                    struct ceph_object_id *oid,
4907                    struct ceph_object_locator *oloc,
4908                    const char *class, const char *method,
4909                    unsigned int flags,
4910                    struct page *req_page, size_t req_len,
4911                    struct page *resp_page, size_t *resp_len)
4912 {
4913         struct ceph_osd_request *req;
4914         int ret;
4915
4916         if (req_len > PAGE_SIZE || (resp_page && *resp_len > PAGE_SIZE))
4917                 return -E2BIG;
4918
4919         req = ceph_osdc_alloc_request(osdc, NULL, 1, false, GFP_NOIO);
4920         if (!req)
4921                 return -ENOMEM;
4922
4923         ceph_oid_copy(&req->r_base_oid, oid);
4924         ceph_oloc_copy(&req->r_base_oloc, oloc);
4925         req->r_flags = flags;
4926
4927         ret = ceph_osdc_alloc_messages(req, GFP_NOIO);
4928         if (ret)
4929                 goto out_put_req;
4930
4931         osd_req_op_cls_init(req, 0, CEPH_OSD_OP_CALL, class, method);
4932         if (req_page)
4933                 osd_req_op_cls_request_data_pages(req, 0, &req_page, req_len,
4934                                                   0, false, false);
4935         if (resp_page)
4936                 osd_req_op_cls_response_data_pages(req, 0, &resp_page,
4937                                                    *resp_len, 0, false, false);
4938
4939         ceph_osdc_start_request(osdc, req, false);
4940         ret = ceph_osdc_wait_request(osdc, req);
4941         if (ret >= 0) {
4942                 ret = req->r_ops[0].rval;
4943                 if (resp_page)
4944                         *resp_len = req->r_ops[0].outdata_len;
4945         }
4946
4947 out_put_req:
4948         ceph_osdc_put_request(req);
4949         return ret;
4950 }
4951 EXPORT_SYMBOL(ceph_osdc_call);
4952
4953 /*
4954  * init, shutdown
4955  */
4956 int ceph_osdc_init(struct ceph_osd_client *osdc, struct ceph_client *client)
4957 {
4958         int err;
4959
4960         dout("init\n");
4961         osdc->client = client;
4962         init_rwsem(&osdc->lock);
4963         osdc->osds = RB_ROOT;
4964         INIT_LIST_HEAD(&osdc->osd_lru);
4965         spin_lock_init(&osdc->osd_lru_lock);
4966         osd_init(&osdc->homeless_osd);
4967         osdc->homeless_osd.o_osdc = osdc;
4968         osdc->homeless_osd.o_osd = CEPH_HOMELESS_OSD;
4969         osdc->last_linger_id = CEPH_LINGER_ID_START;
4970         osdc->linger_requests = RB_ROOT;
4971         osdc->map_checks = RB_ROOT;
4972         osdc->linger_map_checks = RB_ROOT;
4973         INIT_DELAYED_WORK(&osdc->timeout_work, handle_timeout);
4974         INIT_DELAYED_WORK(&osdc->osds_timeout_work, handle_osds_timeout);
4975
4976         err = -ENOMEM;
4977         osdc->osdmap = ceph_osdmap_alloc();
4978         if (!osdc->osdmap)
4979                 goto out;
4980
4981         osdc->req_mempool = mempool_create_slab_pool(10,
4982                                                      ceph_osd_request_cache);
4983         if (!osdc->req_mempool)
4984                 goto out_map;
4985
4986         err = ceph_msgpool_init(&osdc->msgpool_op, CEPH_MSG_OSD_OP,
4987                                 PAGE_SIZE, 10, true, "osd_op");
4988         if (err < 0)
4989                 goto out_mempool;
4990         err = ceph_msgpool_init(&osdc->msgpool_op_reply, CEPH_MSG_OSD_OPREPLY,
4991                                 PAGE_SIZE, 10, true, "osd_op_reply");
4992         if (err < 0)
4993                 goto out_msgpool;
4994
4995         err = -ENOMEM;
4996         osdc->notify_wq = create_singlethread_workqueue("ceph-watch-notify");
4997         if (!osdc->notify_wq)
4998                 goto out_msgpool_reply;
4999
5000         schedule_delayed_work(&osdc->timeout_work,
5001                               osdc->client->options->osd_keepalive_timeout);
5002         schedule_delayed_work(&osdc->osds_timeout_work,
5003             round_jiffies_relative(osdc->client->options->osd_idle_ttl));
5004
5005         return 0;
5006
5007 out_msgpool_reply:
5008         ceph_msgpool_destroy(&osdc->msgpool_op_reply);
5009 out_msgpool:
5010         ceph_msgpool_destroy(&osdc->msgpool_op);
5011 out_mempool:
5012         mempool_destroy(osdc->req_mempool);
5013 out_map:
5014         ceph_osdmap_destroy(osdc->osdmap);
5015 out:
5016         return err;
5017 }
5018
5019 void ceph_osdc_stop(struct ceph_osd_client *osdc)
5020 {
5021         flush_workqueue(osdc->notify_wq);
5022         destroy_workqueue(osdc->notify_wq);
5023         cancel_delayed_work_sync(&osdc->timeout_work);
5024         cancel_delayed_work_sync(&osdc->osds_timeout_work);
5025
5026         down_write(&osdc->lock);
5027         while (!RB_EMPTY_ROOT(&osdc->osds)) {
5028                 struct ceph_osd *osd = rb_entry(rb_first(&osdc->osds),
5029                                                 struct ceph_osd, o_node);
5030                 close_osd(osd);
5031         }
5032         up_write(&osdc->lock);
5033         WARN_ON(refcount_read(&osdc->homeless_osd.o_ref) != 1);
5034         osd_cleanup(&osdc->homeless_osd);
5035
5036         WARN_ON(!list_empty(&osdc->osd_lru));
5037         WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_requests));
5038         WARN_ON(!RB_EMPTY_ROOT(&osdc->map_checks));
5039         WARN_ON(!RB_EMPTY_ROOT(&osdc->linger_map_checks));
5040         WARN_ON(atomic_read(&osdc->num_requests));
5041         WARN_ON(atomic_read(&osdc->num_homeless));
5042
5043         ceph_osdmap_destroy(osdc->osdmap);
5044         mempool_destroy(osdc->req_mempool);
5045         ceph_msgpool_destroy(&osdc->msgpool_op);
5046         ceph_msgpool_destroy(&osdc->msgpool_op_reply);
5047 }
5048
5049 /*
5050  * Read some contiguous pages.  If we cross a stripe boundary, shorten
5051  * *plen.  Return number of bytes read, or error.
5052  */
5053 int ceph_osdc_readpages(struct ceph_osd_client *osdc,
5054                         struct ceph_vino vino, struct ceph_file_layout *layout,
5055                         u64 off, u64 *plen,
5056                         u32 truncate_seq, u64 truncate_size,
5057                         struct page **pages, int num_pages, int page_align)
5058 {
5059         struct ceph_osd_request *req;
5060         int rc = 0;
5061
5062         dout("readpages on ino %llx.%llx on %llu~%llu\n", vino.ino,
5063              vino.snap, off, *plen);
5064         req = ceph_osdc_new_request(osdc, layout, vino, off, plen, 0, 1,
5065                                     CEPH_OSD_OP_READ, CEPH_OSD_FLAG_READ,
5066                                     NULL, truncate_seq, truncate_size,
5067                                     false);
5068         if (IS_ERR(req))
5069                 return PTR_ERR(req);
5070
5071         /* it may be a short read due to an object boundary */
5072         osd_req_op_extent_osd_data_pages(req, 0,
5073                                 pages, *plen, page_align, false, false);
5074
5075         dout("readpages  final extent is %llu~%llu (%llu bytes align %d)\n",
5076              off, *plen, *plen, page_align);
5077
5078         rc = ceph_osdc_start_request(osdc, req, false);
5079         if (!rc)
5080                 rc = ceph_osdc_wait_request(osdc, req);
5081
5082         ceph_osdc_put_request(req);
5083         dout("readpages result %d\n", rc);
5084         return rc;
5085 }
5086 EXPORT_SYMBOL(ceph_osdc_readpages);
5087
5088 /*
5089  * do a synchronous write on N pages
5090  */
5091 int ceph_osdc_writepages(struct ceph_osd_client *osdc, struct ceph_vino vino,
5092                          struct ceph_file_layout *layout,
5093                          struct ceph_snap_context *snapc,
5094                          u64 off, u64 len,
5095                          u32 truncate_seq, u64 truncate_size,
5096                          struct timespec *mtime,
5097                          struct page **pages, int num_pages)
5098 {
5099         struct ceph_osd_request *req;
5100         int rc = 0;
5101         int page_align = off & ~PAGE_MASK;
5102
5103         req = ceph_osdc_new_request(osdc, layout, vino, off, &len, 0, 1,
5104                                     CEPH_OSD_OP_WRITE, CEPH_OSD_FLAG_WRITE,
5105                                     snapc, truncate_seq, truncate_size,
5106                                     true);
5107         if (IS_ERR(req))
5108                 return PTR_ERR(req);
5109
5110         /* it may be a short write due to an object boundary */
5111         osd_req_op_extent_osd_data_pages(req, 0, pages, len, page_align,
5112                                 false, false);
5113         dout("writepages %llu~%llu (%llu bytes)\n", off, len, len);
5114
5115         req->r_mtime = *mtime;
5116         rc = ceph_osdc_start_request(osdc, req, true);
5117         if (!rc)
5118                 rc = ceph_osdc_wait_request(osdc, req);
5119
5120         ceph_osdc_put_request(req);
5121         if (rc == 0)
5122                 rc = len;
5123         dout("writepages result %d\n", rc);
5124         return rc;
5125 }
5126 EXPORT_SYMBOL(ceph_osdc_writepages);
5127
5128 int __init ceph_osdc_setup(void)
5129 {
5130         size_t size = sizeof(struct ceph_osd_request) +
5131             CEPH_OSD_SLAB_OPS * sizeof(struct ceph_osd_req_op);
5132
5133         BUG_ON(ceph_osd_request_cache);
5134         ceph_osd_request_cache = kmem_cache_create("ceph_osd_request", size,
5135                                                    0, 0, NULL);
5136
5137         return ceph_osd_request_cache ? 0 : -ENOMEM;
5138 }
5139
5140 void ceph_osdc_cleanup(void)
5141 {
5142         BUG_ON(!ceph_osd_request_cache);
5143         kmem_cache_destroy(ceph_osd_request_cache);
5144         ceph_osd_request_cache = NULL;
5145 }
5146
5147 /*
5148  * handle incoming message
5149  */
5150 static void dispatch(struct ceph_connection *con, struct ceph_msg *msg)
5151 {
5152         struct ceph_osd *osd = con->private;
5153         struct ceph_osd_client *osdc = osd->o_osdc;
5154         int type = le16_to_cpu(msg->hdr.type);
5155
5156         switch (type) {
5157         case CEPH_MSG_OSD_MAP:
5158                 ceph_osdc_handle_map(osdc, msg);
5159                 break;
5160         case CEPH_MSG_OSD_OPREPLY:
5161                 handle_reply(osd, msg);
5162                 break;
5163         case CEPH_MSG_OSD_BACKOFF:
5164                 handle_backoff(osd, msg);
5165                 break;
5166         case CEPH_MSG_WATCH_NOTIFY:
5167                 handle_watch_notify(osdc, msg);
5168                 break;
5169
5170         default:
5171                 pr_err("received unknown message type %d %s\n", type,
5172                        ceph_msg_type_name(type));
5173         }
5174
5175         ceph_msg_put(msg);
5176 }
5177
5178 /*
5179  * Lookup and return message for incoming reply.  Don't try to do
5180  * anything about a larger than preallocated data portion of the
5181  * message at the moment - for now, just skip the message.
5182  */
5183 static struct ceph_msg *get_reply(struct ceph_connection *con,
5184                                   struct ceph_msg_header *hdr,
5185                                   int *skip)
5186 {
5187         struct ceph_osd *osd = con->private;
5188         struct ceph_osd_client *osdc = osd->o_osdc;
5189         struct ceph_msg *m = NULL;
5190         struct ceph_osd_request *req;
5191         int front_len = le32_to_cpu(hdr->front_len);
5192         int data_len = le32_to_cpu(hdr->data_len);
5193         u64 tid = le64_to_cpu(hdr->tid);
5194
5195         down_read(&osdc->lock);
5196         if (!osd_registered(osd)) {
5197                 dout("%s osd%d unknown, skipping\n", __func__, osd->o_osd);
5198                 *skip = 1;
5199                 goto out_unlock_osdc;
5200         }
5201         WARN_ON(osd->o_osd != le64_to_cpu(hdr->src.num));
5202
5203         mutex_lock(&osd->lock);
5204         req = lookup_request(&osd->o_requests, tid);
5205         if (!req) {
5206                 dout("%s osd%d tid %llu unknown, skipping\n", __func__,
5207                      osd->o_osd, tid);
5208                 *skip = 1;
5209                 goto out_unlock_session;
5210         }
5211
5212         ceph_msg_revoke_incoming(req->r_reply);
5213
5214         if (front_len > req->r_reply->front_alloc_len) {
5215                 pr_warn("%s osd%d tid %llu front %d > preallocated %d\n",
5216                         __func__, osd->o_osd, req->r_tid, front_len,
5217                         req->r_reply->front_alloc_len);
5218                 m = ceph_msg_new(CEPH_MSG_OSD_OPREPLY, front_len, GFP_NOFS,
5219                                  false);
5220                 if (!m)
5221                         goto out_unlock_session;
5222                 ceph_msg_put(req->r_reply);
5223                 req->r_reply = m;
5224         }
5225
5226         if (data_len > req->r_reply->data_length) {
5227                 pr_warn("%s osd%d tid %llu data %d > preallocated %zu, skipping\n",
5228                         __func__, osd->o_osd, req->r_tid, data_len,
5229                         req->r_reply->data_length);
5230                 m = NULL;
5231                 *skip = 1;
5232                 goto out_unlock_session;
5233         }
5234
5235         m = ceph_msg_get(req->r_reply);
5236         dout("get_reply tid %lld %p\n", tid, m);
5237
5238 out_unlock_session:
5239         mutex_unlock(&osd->lock);
5240 out_unlock_osdc:
5241         up_read(&osdc->lock);
5242         return m;
5243 }
5244
5245 /*
5246  * TODO: switch to a msg-owned pagelist
5247  */
5248 static struct ceph_msg *alloc_msg_with_page_vector(struct ceph_msg_header *hdr)
5249 {
5250         struct ceph_msg *m;
5251         int type = le16_to_cpu(hdr->type);
5252         u32 front_len = le32_to_cpu(hdr->front_len);
5253         u32 data_len = le32_to_cpu(hdr->data_len);
5254
5255         m = ceph_msg_new(type, front_len, GFP_NOIO, false);
5256         if (!m)
5257                 return NULL;
5258
5259         if (data_len) {
5260                 struct page **pages;
5261                 struct ceph_osd_data osd_data;
5262
5263                 pages = ceph_alloc_page_vector(calc_pages_for(0, data_len),
5264                                                GFP_NOIO);
5265                 if (IS_ERR(pages)) {
5266                         ceph_msg_put(m);
5267                         return NULL;
5268                 }
5269
5270                 ceph_osd_data_pages_init(&osd_data, pages, data_len, 0, false,
5271                                          false);
5272                 ceph_osdc_msg_data_add(m, &osd_data);
5273         }
5274
5275         return m;
5276 }
5277
5278 static struct ceph_msg *alloc_msg(struct ceph_connection *con,
5279                                   struct ceph_msg_header *hdr,
5280                                   int *skip)
5281 {
5282         struct ceph_osd *osd = con->private;
5283         int type = le16_to_cpu(hdr->type);
5284
5285         *skip = 0;
5286         switch (type) {
5287         case CEPH_MSG_OSD_MAP:
5288         case CEPH_MSG_OSD_BACKOFF:
5289         case CEPH_MSG_WATCH_NOTIFY:
5290                 return alloc_msg_with_page_vector(hdr);
5291         case CEPH_MSG_OSD_OPREPLY:
5292                 return get_reply(con, hdr, skip);
5293         default:
5294                 pr_warn("%s osd%d unknown msg type %d, skipping\n", __func__,
5295                         osd->o_osd, type);
5296                 *skip = 1;
5297                 return NULL;
5298         }
5299 }
5300
5301 /*
5302  * Wrappers to refcount containing ceph_osd struct
5303  */
5304 static struct ceph_connection *get_osd_con(struct ceph_connection *con)
5305 {
5306         struct ceph_osd *osd = con->private;
5307         if (get_osd(osd))
5308                 return con;
5309         return NULL;
5310 }
5311
5312 static void put_osd_con(struct ceph_connection *con)
5313 {
5314         struct ceph_osd *osd = con->private;
5315         put_osd(osd);
5316 }
5317
5318 /*
5319  * authentication
5320  */
5321 /*
5322  * Note: returned pointer is the address of a structure that's
5323  * managed separately.  Caller must *not* attempt to free it.
5324  */
5325 static struct ceph_auth_handshake *get_authorizer(struct ceph_connection *con,
5326                                         int *proto, int force_new)
5327 {
5328         struct ceph_osd *o = con->private;
5329         struct ceph_osd_client *osdc = o->o_osdc;
5330         struct ceph_auth_client *ac = osdc->client->monc.auth;
5331         struct ceph_auth_handshake *auth = &o->o_auth;
5332
5333         if (force_new && auth->authorizer) {
5334                 ceph_auth_destroy_authorizer(auth->authorizer);
5335                 auth->authorizer = NULL;
5336         }
5337         if (!auth->authorizer) {
5338                 int ret = ceph_auth_create_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
5339                                                       auth);
5340                 if (ret)
5341                         return ERR_PTR(ret);
5342         } else {
5343                 int ret = ceph_auth_update_authorizer(ac, CEPH_ENTITY_TYPE_OSD,
5344                                                      auth);
5345                 if (ret)
5346                         return ERR_PTR(ret);
5347         }
5348         *proto = ac->protocol;
5349
5350         return auth;
5351 }
5352
5353
5354 static int verify_authorizer_reply(struct ceph_connection *con)
5355 {
5356         struct ceph_osd *o = con->private;
5357         struct ceph_osd_client *osdc = o->o_osdc;
5358         struct ceph_auth_client *ac = osdc->client->monc.auth;
5359
5360         return ceph_auth_verify_authorizer_reply(ac, o->o_auth.authorizer);
5361 }
5362
5363 static int invalidate_authorizer(struct ceph_connection *con)
5364 {
5365         struct ceph_osd *o = con->private;
5366         struct ceph_osd_client *osdc = o->o_osdc;
5367         struct ceph_auth_client *ac = osdc->client->monc.auth;
5368
5369         ceph_auth_invalidate_authorizer(ac, CEPH_ENTITY_TYPE_OSD);
5370         return ceph_monc_validate_auth(&osdc->client->monc);
5371 }
5372
5373 static void osd_reencode_message(struct ceph_msg *msg)
5374 {
5375         int type = le16_to_cpu(msg->hdr.type);
5376
5377         if (type == CEPH_MSG_OSD_OP)
5378                 encode_request_finish(msg);
5379 }
5380
5381 static int osd_sign_message(struct ceph_msg *msg)
5382 {
5383         struct ceph_osd *o = msg->con->private;
5384         struct ceph_auth_handshake *auth = &o->o_auth;
5385
5386         return ceph_auth_sign_message(auth, msg);
5387 }
5388
5389 static int osd_check_message_signature(struct ceph_msg *msg)
5390 {
5391         struct ceph_osd *o = msg->con->private;
5392         struct ceph_auth_handshake *auth = &o->o_auth;
5393
5394         return ceph_auth_check_message_signature(auth, msg);
5395 }
5396
5397 static const struct ceph_connection_operations osd_con_ops = {
5398         .get = get_osd_con,
5399         .put = put_osd_con,
5400         .dispatch = dispatch,
5401         .get_authorizer = get_authorizer,
5402         .verify_authorizer_reply = verify_authorizer_reply,
5403         .invalidate_authorizer = invalidate_authorizer,
5404         .alloc_msg = alloc_msg,
5405         .reencode_message = osd_reencode_message,
5406         .sign_message = osd_sign_message,
5407         .check_message_signature = osd_check_message_signature,
5408         .fault = osd_fault,
5409 };