staging: lustre: osc: remove ccflags from Makefile
[linux-block.git] / drivers / staging / lustre / lustre / osc / osc_request.c
1 /*
2  * GPL HEADER START
3  *
4  * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5  *
6  * This program is free software; you can redistribute it and/or modify
7  * it under the terms of the GNU General Public License version 2 only,
8  * as published by the Free Software Foundation.
9  *
10  * This program is distributed in the hope that it will be useful, but
11  * WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
13  * General Public License version 2 for more details (a copy is included
14  * in the LICENSE file that accompanied this code).
15  *
16  * You should have received a copy of the GNU General Public License
17  * version 2 along with this program; If not, see
18  * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19  *
20  * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21  * CA 95054 USA or visit www.sun.com if you need additional information or
22  * have any questions.
23  *
24  * GPL HEADER END
25  */
26 /*
27  * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28  * Use is subject to license terms.
29  *
30  * Copyright (c) 2011, 2012, Intel Corporation.
31  */
32 /*
33  * This file is part of Lustre, http://www.lustre.org/
34  * Lustre is a trademark of Sun Microsystems, Inc.
35  */
36
37 #define DEBUG_SUBSYSTEM S_OSC
38
39 #include "../../include/linux/libcfs/libcfs.h"
40
41
42 #include "../include/lustre_dlm.h"
43 #include "../include/lustre_net.h"
44 #include "../include/lustre/lustre_user.h"
45 #include "../include/obd_cksum.h"
46 #include "../include/obd_ost.h"
47
48 #include "../include/lustre_ha.h"
49 #include "../include/lprocfs_status.h"
50 #include "../include/lustre_log.h"
51 #include "../include/lustre_debug.h"
52 #include "../include/lustre_param.h"
53 #include "../include/lustre_fid.h"
54 #include "osc_internal.h"
55 #include "osc_cl_internal.h"
56
57 static void osc_release_ppga(struct brw_page **ppga, obd_count count);
58 static int brw_interpret(const struct lu_env *env,
59                          struct ptlrpc_request *req, void *data, int rc);
60 int osc_cleanup(struct obd_device *obd);
61
62 /* Pack OSC object metadata for disk storage (LE byte order). */
63 static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
64                       struct lov_stripe_md *lsm)
65 {
66         int lmm_size;
67
68         lmm_size = sizeof(**lmmp);
69         if (lmmp == NULL)
70                 return lmm_size;
71
72         if (*lmmp != NULL && lsm == NULL) {
73                 OBD_FREE(*lmmp, lmm_size);
74                 *lmmp = NULL;
75                 return 0;
76         } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
77                 return -EBADF;
78         }
79
80         if (*lmmp == NULL) {
81                 OBD_ALLOC(*lmmp, lmm_size);
82                 if (*lmmp == NULL)
83                         return -ENOMEM;
84         }
85
86         if (lsm)
87                 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
88
89         return lmm_size;
90 }
91
92 /* Unpack OSC object metadata from disk storage (LE byte order). */
93 static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
94                         struct lov_mds_md *lmm, int lmm_bytes)
95 {
96         int lsm_size;
97         struct obd_import *imp = class_exp2cliimp(exp);
98
99         if (lmm != NULL) {
100                 if (lmm_bytes < sizeof(*lmm)) {
101                         CERROR("%s: lov_mds_md too small: %d, need %d\n",
102                                exp->exp_obd->obd_name, lmm_bytes,
103                                (int)sizeof(*lmm));
104                         return -EINVAL;
105                 }
106                 /* XXX LOV_MAGIC etc check? */
107
108                 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
109                         CERROR("%s: zero lmm_object_id: rc = %d\n",
110                                exp->exp_obd->obd_name, -EINVAL);
111                         return -EINVAL;
112                 }
113         }
114
115         lsm_size = lov_stripe_md_size(1);
116         if (lsmp == NULL)
117                 return lsm_size;
118
119         if (*lsmp != NULL && lmm == NULL) {
120                 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
121                 OBD_FREE(*lsmp, lsm_size);
122                 *lsmp = NULL;
123                 return 0;
124         }
125
126         if (*lsmp == NULL) {
127                 OBD_ALLOC(*lsmp, lsm_size);
128                 if (unlikely(*lsmp == NULL))
129                         return -ENOMEM;
130                 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
131                 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
132                         OBD_FREE(*lsmp, lsm_size);
133                         return -ENOMEM;
134                 }
135                 loi_init((*lsmp)->lsm_oinfo[0]);
136         } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
137                 return -EBADF;
138         }
139
140         if (lmm != NULL)
141                 /* XXX zero *lsmp? */
142                 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
143
144         if (imp != NULL &&
145             (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
146                 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
147         else
148                 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
149
150         return lsm_size;
151 }
152
153 static inline void osc_pack_capa(struct ptlrpc_request *req,
154                                  struct ost_body *body, void *capa)
155 {
156         struct obd_capa *oc = (struct obd_capa *)capa;
157         struct lustre_capa *c;
158
159         if (!capa)
160                 return;
161
162         c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
163         LASSERT(c);
164         capa_cpy(c, oc);
165         body->oa.o_valid |= OBD_MD_FLOSSCAPA;
166         DEBUG_CAPA(D_SEC, c, "pack");
167 }
168
169 static inline void osc_pack_req_body(struct ptlrpc_request *req,
170                                      struct obd_info *oinfo)
171 {
172         struct ost_body *body;
173
174         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
175         LASSERT(body);
176
177         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
178                              oinfo->oi_oa);
179         osc_pack_capa(req, body, oinfo->oi_capa);
180 }
181
182 static inline void osc_set_capa_size(struct ptlrpc_request *req,
183                                      const struct req_msg_field *field,
184                                      struct obd_capa *oc)
185 {
186         if (oc == NULL)
187                 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
188         else
189                 /* it is already calculated as sizeof struct obd_capa */
190                 ;
191 }
192
193 static int osc_getattr_interpret(const struct lu_env *env,
194                                  struct ptlrpc_request *req,
195                                  struct osc_async_args *aa, int rc)
196 {
197         struct ost_body *body;
198
199         if (rc != 0)
200                 GOTO(out, rc);
201
202         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
203         if (body) {
204                 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
205                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
206                                      aa->aa_oi->oi_oa, &body->oa);
207
208                 /* This should really be sent by the OST */
209                 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
210                 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
211         } else {
212                 CDEBUG(D_INFO, "can't unpack ost_body\n");
213                 rc = -EPROTO;
214                 aa->aa_oi->oi_oa->o_valid = 0;
215         }
216 out:
217         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
218         return rc;
219 }
220
221 static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
222                              struct ptlrpc_request_set *set)
223 {
224         struct ptlrpc_request *req;
225         struct osc_async_args *aa;
226         int                 rc;
227
228         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
229         if (req == NULL)
230                 return -ENOMEM;
231
232         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
233         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
234         if (rc) {
235                 ptlrpc_request_free(req);
236                 return rc;
237         }
238
239         osc_pack_req_body(req, oinfo);
240
241         ptlrpc_request_set_replen(req);
242         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
243
244         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
245         aa = ptlrpc_req_async_args(req);
246         aa->aa_oi = oinfo;
247
248         ptlrpc_set_add_req(set, req);
249         return 0;
250 }
251
252 static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
253                        struct obd_info *oinfo)
254 {
255         struct ptlrpc_request *req;
256         struct ost_body       *body;
257         int                 rc;
258
259         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
260         if (req == NULL)
261                 return -ENOMEM;
262
263         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
264         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
265         if (rc) {
266                 ptlrpc_request_free(req);
267                 return rc;
268         }
269
270         osc_pack_req_body(req, oinfo);
271
272         ptlrpc_request_set_replen(req);
273
274         rc = ptlrpc_queue_wait(req);
275         if (rc)
276                 GOTO(out, rc);
277
278         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
279         if (body == NULL)
280                 GOTO(out, rc = -EPROTO);
281
282         CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
283         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
284                              &body->oa);
285
286         oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
287         oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
288
289  out:
290         ptlrpc_req_finished(req);
291         return rc;
292 }
293
294 static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
295                        struct obd_info *oinfo, struct obd_trans_info *oti)
296 {
297         struct ptlrpc_request *req;
298         struct ost_body       *body;
299         int                 rc;
300
301         LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
302
303         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
304         if (req == NULL)
305                 return -ENOMEM;
306
307         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
308         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
309         if (rc) {
310                 ptlrpc_request_free(req);
311                 return rc;
312         }
313
314         osc_pack_req_body(req, oinfo);
315
316         ptlrpc_request_set_replen(req);
317
318         rc = ptlrpc_queue_wait(req);
319         if (rc)
320                 GOTO(out, rc);
321
322         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
323         if (body == NULL)
324                 GOTO(out, rc = -EPROTO);
325
326         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
327                              &body->oa);
328
329 out:
330         ptlrpc_req_finished(req);
331         return rc;
332 }
333
334 static int osc_setattr_interpret(const struct lu_env *env,
335                                  struct ptlrpc_request *req,
336                                  struct osc_setattr_args *sa, int rc)
337 {
338         struct ost_body *body;
339
340         if (rc != 0)
341                 GOTO(out, rc);
342
343         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
344         if (body == NULL)
345                 GOTO(out, rc = -EPROTO);
346
347         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
348                              &body->oa);
349 out:
350         rc = sa->sa_upcall(sa->sa_cookie, rc);
351         return rc;
352 }
353
354 int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
355                            struct obd_trans_info *oti,
356                            obd_enqueue_update_f upcall, void *cookie,
357                            struct ptlrpc_request_set *rqset)
358 {
359         struct ptlrpc_request   *req;
360         struct osc_setattr_args *sa;
361         int                   rc;
362
363         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
364         if (req == NULL)
365                 return -ENOMEM;
366
367         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
368         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
369         if (rc) {
370                 ptlrpc_request_free(req);
371                 return rc;
372         }
373
374         if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
375                 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
376
377         osc_pack_req_body(req, oinfo);
378
379         ptlrpc_request_set_replen(req);
380
381         /* do mds to ost setattr asynchronously */
382         if (!rqset) {
383                 /* Do not wait for response. */
384                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
385         } else {
386                 req->rq_interpret_reply =
387                         (ptlrpc_interpterer_t)osc_setattr_interpret;
388
389                 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
390                 sa = ptlrpc_req_async_args(req);
391                 sa->sa_oa = oinfo->oi_oa;
392                 sa->sa_upcall = upcall;
393                 sa->sa_cookie = cookie;
394
395                 if (rqset == PTLRPCD_SET)
396                         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
397                 else
398                         ptlrpc_set_add_req(rqset, req);
399         }
400
401         return 0;
402 }
403
404 static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
405                              struct obd_trans_info *oti,
406                              struct ptlrpc_request_set *rqset)
407 {
408         return osc_setattr_async_base(exp, oinfo, oti,
409                                       oinfo->oi_cb_up, oinfo, rqset);
410 }
411
412 int osc_real_create(struct obd_export *exp, struct obdo *oa,
413                     struct lov_stripe_md **ea, struct obd_trans_info *oti)
414 {
415         struct ptlrpc_request *req;
416         struct ost_body       *body;
417         struct lov_stripe_md  *lsm;
418         int                 rc;
419
420         LASSERT(oa);
421         LASSERT(ea);
422
423         lsm = *ea;
424         if (!lsm) {
425                 rc = obd_alloc_memmd(exp, &lsm);
426                 if (rc < 0)
427                         return rc;
428         }
429
430         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
431         if (req == NULL)
432                 GOTO(out, rc = -ENOMEM);
433
434         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
435         if (rc) {
436                 ptlrpc_request_free(req);
437                 GOTO(out, rc);
438         }
439
440         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
441         LASSERT(body);
442
443         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
444
445         ptlrpc_request_set_replen(req);
446
447         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
448             oa->o_flags == OBD_FL_DELORPHAN) {
449                 DEBUG_REQ(D_HA, req,
450                           "delorphan from OST integration");
451                 /* Don't resend the delorphan req */
452                 req->rq_no_resend = req->rq_no_delay = 1;
453         }
454
455         rc = ptlrpc_queue_wait(req);
456         if (rc)
457                 GOTO(out_req, rc);
458
459         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
460         if (body == NULL)
461                 GOTO(out_req, rc = -EPROTO);
462
463         CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
464         lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
465
466         oa->o_blksize = cli_brw_size(exp->exp_obd);
467         oa->o_valid |= OBD_MD_FLBLKSZ;
468
469         /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
470          * have valid lsm_oinfo data structs, so don't go touching that.
471          * This needs to be fixed in a big way.
472          */
473         lsm->lsm_oi = oa->o_oi;
474         *ea = lsm;
475
476         if (oti != NULL) {
477                 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
478
479                 if (oa->o_valid & OBD_MD_FLCOOKIE) {
480                         if (!oti->oti_logcookies)
481                                 oti_alloc_cookies(oti, 1);
482                         *oti->oti_logcookies = oa->o_lcookie;
483                 }
484         }
485
486         CDEBUG(D_HA, "transno: "LPD64"\n",
487                lustre_msg_get_transno(req->rq_repmsg));
488 out_req:
489         ptlrpc_req_finished(req);
490 out:
491         if (rc && !*ea)
492                 obd_free_memmd(exp, &lsm);
493         return rc;
494 }
495
496 int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
497                    obd_enqueue_update_f upcall, void *cookie,
498                    struct ptlrpc_request_set *rqset)
499 {
500         struct ptlrpc_request   *req;
501         struct osc_setattr_args *sa;
502         struct ost_body  *body;
503         int                   rc;
504
505         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
506         if (req == NULL)
507                 return -ENOMEM;
508
509         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
510         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
511         if (rc) {
512                 ptlrpc_request_free(req);
513                 return rc;
514         }
515         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
516         ptlrpc_at_set_req_timeout(req);
517
518         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
519         LASSERT(body);
520         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
521                              oinfo->oi_oa);
522         osc_pack_capa(req, body, oinfo->oi_capa);
523
524         ptlrpc_request_set_replen(req);
525
526         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
527         CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
528         sa = ptlrpc_req_async_args(req);
529         sa->sa_oa     = oinfo->oi_oa;
530         sa->sa_upcall = upcall;
531         sa->sa_cookie = cookie;
532         if (rqset == PTLRPCD_SET)
533                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
534         else
535                 ptlrpc_set_add_req(rqset, req);
536
537         return 0;
538 }
539
540 static int osc_punch(const struct lu_env *env, struct obd_export *exp,
541                      struct obd_info *oinfo, struct obd_trans_info *oti,
542                      struct ptlrpc_request_set *rqset)
543 {
544         oinfo->oi_oa->o_size   = oinfo->oi_policy.l_extent.start;
545         oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
546         oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
547         return osc_punch_base(exp, oinfo,
548                               oinfo->oi_cb_up, oinfo, rqset);
549 }
550
551 static int osc_sync_interpret(const struct lu_env *env,
552                               struct ptlrpc_request *req,
553                               void *arg, int rc)
554 {
555         struct osc_fsync_args *fa = arg;
556         struct ost_body *body;
557
558         if (rc)
559                 GOTO(out, rc);
560
561         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
562         if (body == NULL) {
563                 CERROR ("can't unpack ost_body\n");
564                 GOTO(out, rc = -EPROTO);
565         }
566
567         *fa->fa_oi->oi_oa = body->oa;
568 out:
569         rc = fa->fa_upcall(fa->fa_cookie, rc);
570         return rc;
571 }
572
573 int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
574                   obd_enqueue_update_f upcall, void *cookie,
575                   struct ptlrpc_request_set *rqset)
576 {
577         struct ptlrpc_request *req;
578         struct ost_body       *body;
579         struct osc_fsync_args *fa;
580         int                 rc;
581
582         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
583         if (req == NULL)
584                 return -ENOMEM;
585
586         osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
587         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
588         if (rc) {
589                 ptlrpc_request_free(req);
590                 return rc;
591         }
592
593         /* overload the size and blocks fields in the oa with start/end */
594         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
595         LASSERT(body);
596         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
597                              oinfo->oi_oa);
598         osc_pack_capa(req, body, oinfo->oi_capa);
599
600         ptlrpc_request_set_replen(req);
601         req->rq_interpret_reply = osc_sync_interpret;
602
603         CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
604         fa = ptlrpc_req_async_args(req);
605         fa->fa_oi = oinfo;
606         fa->fa_upcall = upcall;
607         fa->fa_cookie = cookie;
608
609         if (rqset == PTLRPCD_SET)
610                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
611         else
612                 ptlrpc_set_add_req(rqset, req);
613
614         return 0;
615 }
616
617 static int osc_sync(const struct lu_env *env, struct obd_export *exp,
618                     struct obd_info *oinfo, obd_size start, obd_size end,
619                     struct ptlrpc_request_set *set)
620 {
621         if (!oinfo->oi_oa) {
622                 CDEBUG(D_INFO, "oa NULL\n");
623                 return -EINVAL;
624         }
625
626         oinfo->oi_oa->o_size = start;
627         oinfo->oi_oa->o_blocks = end;
628         oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
629
630         return osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set);
631 }
632
633 /* Find and cancel locally locks matched by @mode in the resource found by
634  * @objid. Found locks are added into @cancel list. Returns the amount of
635  * locks added to @cancels list. */
636 static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
637                                    struct list_head *cancels,
638                                    ldlm_mode_t mode, __u64 lock_flags)
639 {
640         struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
641         struct ldlm_res_id res_id;
642         struct ldlm_resource *res;
643         int count;
644
645         /* Return, i.e. cancel nothing, only if ELC is supported (flag in
646          * export) but disabled through procfs (flag in NS).
647          *
648          * This distinguishes from a case when ELC is not supported originally,
649          * when we still want to cancel locks in advance and just cancel them
650          * locally, without sending any RPC. */
651         if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
652                 return 0;
653
654         ostid_build_res_name(&oa->o_oi, &res_id);
655         res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
656         if (res == NULL)
657                 return 0;
658
659         LDLM_RESOURCE_ADDREF(res);
660         count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
661                                            lock_flags, 0, NULL);
662         LDLM_RESOURCE_DELREF(res);
663         ldlm_resource_putref(res);
664         return count;
665 }
666
667 static int osc_destroy_interpret(const struct lu_env *env,
668                                  struct ptlrpc_request *req, void *data,
669                                  int rc)
670 {
671         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
672
673         atomic_dec(&cli->cl_destroy_in_flight);
674         wake_up(&cli->cl_destroy_waitq);
675         return 0;
676 }
677
678 static int osc_can_send_destroy(struct client_obd *cli)
679 {
680         if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
681             cli->cl_max_rpcs_in_flight) {
682                 /* The destroy request can be sent */
683                 return 1;
684         }
685         if (atomic_dec_return(&cli->cl_destroy_in_flight) <
686             cli->cl_max_rpcs_in_flight) {
687                 /*
688                  * The counter has been modified between the two atomic
689                  * operations.
690                  */
691                 wake_up(&cli->cl_destroy_waitq);
692         }
693         return 0;
694 }
695
696 int osc_create(const struct lu_env *env, struct obd_export *exp,
697                struct obdo *oa, struct lov_stripe_md **ea,
698                struct obd_trans_info *oti)
699 {
700         int rc = 0;
701
702         LASSERT(oa);
703         LASSERT(ea);
704         LASSERT(oa->o_valid & OBD_MD_FLGROUP);
705
706         if ((oa->o_valid & OBD_MD_FLFLAGS) &&
707             oa->o_flags == OBD_FL_RECREATE_OBJS) {
708                 return osc_real_create(exp, oa, ea, oti);
709         }
710
711         if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
712                 return osc_real_create(exp, oa, ea, oti);
713
714         /* we should not get here anymore */
715         LBUG();
716
717         return rc;
718 }
719
720 /* Destroy requests can be async always on the client, and we don't even really
721  * care about the return code since the client cannot do anything at all about
722  * a destroy failure.
723  * When the MDS is unlinking a filename, it saves the file objects into a
724  * recovery llog, and these object records are cancelled when the OST reports
725  * they were destroyed and sync'd to disk (i.e. transaction committed).
726  * If the client dies, or the OST is down when the object should be destroyed,
727  * the records are not cancelled, and when the OST reconnects to the MDS next,
728  * it will retrieve the llog unlink logs and then sends the log cancellation
729  * cookies to the MDS after committing destroy transactions. */
730 static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
731                        struct obdo *oa, struct lov_stripe_md *ea,
732                        struct obd_trans_info *oti, struct obd_export *md_export,
733                        void *capa)
734 {
735         struct client_obd     *cli = &exp->exp_obd->u.cli;
736         struct ptlrpc_request *req;
737         struct ost_body       *body;
738         LIST_HEAD(cancels);
739         int rc, count;
740
741         if (!oa) {
742                 CDEBUG(D_INFO, "oa NULL\n");
743                 return -EINVAL;
744         }
745
746         count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
747                                         LDLM_FL_DISCARD_DATA);
748
749         req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
750         if (req == NULL) {
751                 ldlm_lock_list_put(&cancels, l_bl_ast, count);
752                 return -ENOMEM;
753         }
754
755         osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
756         rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
757                                0, &cancels, count);
758         if (rc) {
759                 ptlrpc_request_free(req);
760                 return rc;
761         }
762
763         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
764         ptlrpc_at_set_req_timeout(req);
765
766         if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
767                 oa->o_lcookie = *oti->oti_logcookies;
768         body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
769         LASSERT(body);
770         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
771
772         osc_pack_capa(req, body, (struct obd_capa *)capa);
773         ptlrpc_request_set_replen(req);
774
775         /* If osc_destroy is for destroying the unlink orphan,
776          * sent from MDT to OST, which should not be blocked here,
777          * because the process might be triggered by ptlrpcd, and
778          * it is not good to block ptlrpcd thread (b=16006)*/
779         if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
780                 req->rq_interpret_reply = osc_destroy_interpret;
781                 if (!osc_can_send_destroy(cli)) {
782                         struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
783                                                           NULL);
784
785                         /*
786                          * Wait until the number of on-going destroy RPCs drops
787                          * under max_rpc_in_flight
788                          */
789                         l_wait_event_exclusive(cli->cl_destroy_waitq,
790                                                osc_can_send_destroy(cli), &lwi);
791                 }
792         }
793
794         /* Do not wait for response */
795         ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
796         return 0;
797 }
798
799 static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
800                                 long writing_bytes)
801 {
802         obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
803
804         LASSERT(!(oa->o_valid & bits));
805
806         oa->o_valid |= bits;
807         client_obd_list_lock(&cli->cl_loi_list_lock);
808         oa->o_dirty = cli->cl_dirty;
809         if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
810                      cli->cl_dirty_max)) {
811                 CERROR("dirty %lu - %lu > dirty_max %lu\n",
812                        cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
813                 oa->o_undirty = 0;
814         } else if (unlikely(atomic_read(&obd_dirty_pages) -
815                             atomic_read(&obd_dirty_transit_pages) >
816                             (long)(obd_max_dirty_pages + 1))) {
817                 /* The atomic_read() allowing the atomic_inc() are
818                  * not covered by a lock thus they may safely race and trip
819                  * this CERROR() unless we add in a small fudge factor (+1). */
820                 CERROR("dirty %d - %d > system dirty_max %d\n",
821                        atomic_read(&obd_dirty_pages),
822                        atomic_read(&obd_dirty_transit_pages),
823                        obd_max_dirty_pages);
824                 oa->o_undirty = 0;
825         } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
826                 CERROR("dirty %lu - dirty_max %lu too big???\n",
827                        cli->cl_dirty, cli->cl_dirty_max);
828                 oa->o_undirty = 0;
829         } else {
830                 long max_in_flight = (cli->cl_max_pages_per_rpc <<
831                                       PAGE_CACHE_SHIFT)*
832                                      (cli->cl_max_rpcs_in_flight + 1);
833                 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
834         }
835         oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
836         oa->o_dropped = cli->cl_lost_grant;
837         cli->cl_lost_grant = 0;
838         client_obd_list_unlock(&cli->cl_loi_list_lock);
839         CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
840                oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
841
842 }
843
844 void osc_update_next_shrink(struct client_obd *cli)
845 {
846         cli->cl_next_shrink_grant =
847                 cfs_time_shift(cli->cl_grant_shrink_interval);
848         CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
849                cli->cl_next_shrink_grant);
850 }
851
852 static void __osc_update_grant(struct client_obd *cli, obd_size grant)
853 {
854         client_obd_list_lock(&cli->cl_loi_list_lock);
855         cli->cl_avail_grant += grant;
856         client_obd_list_unlock(&cli->cl_loi_list_lock);
857 }
858
859 static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
860 {
861         if (body->oa.o_valid & OBD_MD_FLGRANT) {
862                 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
863                 __osc_update_grant(cli, body->oa.o_grant);
864         }
865 }
866
867 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
868                               obd_count keylen, void *key, obd_count vallen,
869                               void *val, struct ptlrpc_request_set *set);
870
871 static int osc_shrink_grant_interpret(const struct lu_env *env,
872                                       struct ptlrpc_request *req,
873                                       void *aa, int rc)
874 {
875         struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
876         struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
877         struct ost_body *body;
878
879         if (rc != 0) {
880                 __osc_update_grant(cli, oa->o_grant);
881                 GOTO(out, rc);
882         }
883
884         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
885         LASSERT(body);
886         osc_update_grant(cli, body);
887 out:
888         OBDO_FREE(oa);
889         return rc;
890 }
891
892 static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
893 {
894         client_obd_list_lock(&cli->cl_loi_list_lock);
895         oa->o_grant = cli->cl_avail_grant / 4;
896         cli->cl_avail_grant -= oa->o_grant;
897         client_obd_list_unlock(&cli->cl_loi_list_lock);
898         if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
899                 oa->o_valid |= OBD_MD_FLFLAGS;
900                 oa->o_flags = 0;
901         }
902         oa->o_flags |= OBD_FL_SHRINK_GRANT;
903         osc_update_next_shrink(cli);
904 }
905
906 /* Shrink the current grant, either from some large amount to enough for a
907  * full set of in-flight RPCs, or if we have already shrunk to that limit
908  * then to enough for a single RPC.  This avoids keeping more grant than
909  * needed, and avoids shrinking the grant piecemeal. */
910 static int osc_shrink_grant(struct client_obd *cli)
911 {
912         __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
913                              (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
914
915         client_obd_list_lock(&cli->cl_loi_list_lock);
916         if (cli->cl_avail_grant <= target_bytes)
917                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
918         client_obd_list_unlock(&cli->cl_loi_list_lock);
919
920         return osc_shrink_grant_to_target(cli, target_bytes);
921 }
922
923 int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
924 {
925         int                     rc = 0;
926         struct ost_body *body;
927
928         client_obd_list_lock(&cli->cl_loi_list_lock);
929         /* Don't shrink if we are already above or below the desired limit
930          * We don't want to shrink below a single RPC, as that will negatively
931          * impact block allocation and long-term performance. */
932         if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
933                 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
934
935         if (target_bytes >= cli->cl_avail_grant) {
936                 client_obd_list_unlock(&cli->cl_loi_list_lock);
937                 return 0;
938         }
939         client_obd_list_unlock(&cli->cl_loi_list_lock);
940
941         OBD_ALLOC_PTR(body);
942         if (!body)
943                 return -ENOMEM;
944
945         osc_announce_cached(cli, &body->oa, 0);
946
947         client_obd_list_lock(&cli->cl_loi_list_lock);
948         body->oa.o_grant = cli->cl_avail_grant - target_bytes;
949         cli->cl_avail_grant = target_bytes;
950         client_obd_list_unlock(&cli->cl_loi_list_lock);
951         if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
952                 body->oa.o_valid |= OBD_MD_FLFLAGS;
953                 body->oa.o_flags = 0;
954         }
955         body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
956         osc_update_next_shrink(cli);
957
958         rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
959                                 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
960                                 sizeof(*body), body, NULL);
961         if (rc != 0)
962                 __osc_update_grant(cli, body->oa.o_grant);
963         OBD_FREE_PTR(body);
964         return rc;
965 }
966
967 static int osc_should_shrink_grant(struct client_obd *client)
968 {
969         cfs_time_t time = cfs_time_current();
970         cfs_time_t next_shrink = client->cl_next_shrink_grant;
971
972         if ((client->cl_import->imp_connect_data.ocd_connect_flags &
973              OBD_CONNECT_GRANT_SHRINK) == 0)
974                 return 0;
975
976         if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
977                 /* Get the current RPC size directly, instead of going via:
978                  * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
979                  * Keep comment here so that it can be found by searching. */
980                 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
981
982                 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
983                     client->cl_avail_grant > brw_size)
984                         return 1;
985                 else
986                         osc_update_next_shrink(client);
987         }
988         return 0;
989 }
990
991 static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
992 {
993         struct client_obd *client;
994
995         list_for_each_entry(client, &item->ti_obd_list,
996                                 cl_grant_shrink_list) {
997                 if (osc_should_shrink_grant(client))
998                         osc_shrink_grant(client);
999         }
1000         return 0;
1001 }
1002
1003 static int osc_add_shrink_grant(struct client_obd *client)
1004 {
1005         int rc;
1006
1007         rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1008                                        TIMEOUT_GRANT,
1009                                        osc_grant_shrink_grant_cb, NULL,
1010                                        &client->cl_grant_shrink_list);
1011         if (rc) {
1012                 CERROR("add grant client %s error %d\n",
1013                         client->cl_import->imp_obd->obd_name, rc);
1014                 return rc;
1015         }
1016         CDEBUG(D_CACHE, "add grant client %s \n",
1017                client->cl_import->imp_obd->obd_name);
1018         osc_update_next_shrink(client);
1019         return 0;
1020 }
1021
1022 static int osc_del_shrink_grant(struct client_obd *client)
1023 {
1024         return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1025                                          TIMEOUT_GRANT);
1026 }
1027
1028 static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1029 {
1030         /*
1031          * ocd_grant is the total grant amount we're expect to hold: if we've
1032          * been evicted, it's the new avail_grant amount, cl_dirty will drop
1033          * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1034          *
1035          * race is tolerable here: if we're evicted, but imp_state already
1036          * left EVICTED state, then cl_dirty must be 0 already.
1037          */
1038         client_obd_list_lock(&cli->cl_loi_list_lock);
1039         if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1040                 cli->cl_avail_grant = ocd->ocd_grant;
1041         else
1042                 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1043
1044         if (cli->cl_avail_grant < 0) {
1045                 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1046                       cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1047                       ocd->ocd_grant, cli->cl_dirty);
1048                 /* workaround for servers which do not have the patch from
1049                  * LU-2679 */
1050                 cli->cl_avail_grant = ocd->ocd_grant;
1051         }
1052
1053         /* determine the appropriate chunk size used by osc_extent. */
1054         cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1055         client_obd_list_unlock(&cli->cl_loi_list_lock);
1056
1057         CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1058                 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1059                 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1060
1061         if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1062             list_empty(&cli->cl_grant_shrink_list))
1063                 osc_add_shrink_grant(cli);
1064 }
1065
1066 /* We assume that the reason this OSC got a short read is because it read
1067  * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1068  * via the LOV, and it _knows_ it's reading inside the file, it's just that
1069  * this stripe never got written at or beyond this stripe offset yet. */
1070 static void handle_short_read(int nob_read, obd_count page_count,
1071                               struct brw_page **pga)
1072 {
1073         char *ptr;
1074         int i = 0;
1075
1076         /* skip bytes read OK */
1077         while (nob_read > 0) {
1078                 LASSERT (page_count > 0);
1079
1080                 if (pga[i]->count > nob_read) {
1081                         /* EOF inside this page */
1082                         ptr = kmap(pga[i]->pg) +
1083                                 (pga[i]->off & ~CFS_PAGE_MASK);
1084                         memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1085                         kunmap(pga[i]->pg);
1086                         page_count--;
1087                         i++;
1088                         break;
1089                 }
1090
1091                 nob_read -= pga[i]->count;
1092                 page_count--;
1093                 i++;
1094         }
1095
1096         /* zero remaining pages */
1097         while (page_count-- > 0) {
1098                 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1099                 memset(ptr, 0, pga[i]->count);
1100                 kunmap(pga[i]->pg);
1101                 i++;
1102         }
1103 }
1104
1105 static int check_write_rcs(struct ptlrpc_request *req,
1106                            int requested_nob, int niocount,
1107                            obd_count page_count, struct brw_page **pga)
1108 {
1109         int     i;
1110         __u32   *remote_rcs;
1111
1112         remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1113                                                   sizeof(*remote_rcs) *
1114                                                   niocount);
1115         if (remote_rcs == NULL) {
1116                 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1117                 return(-EPROTO);
1118         }
1119
1120         /* return error if any niobuf was in error */
1121         for (i = 0; i < niocount; i++) {
1122                 if ((int)remote_rcs[i] < 0)
1123                         return(remote_rcs[i]);
1124
1125                 if (remote_rcs[i] != 0) {
1126                         CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1127                                 i, remote_rcs[i], req);
1128                         return(-EPROTO);
1129                 }
1130         }
1131
1132         if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1133                 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1134                        req->rq_bulk->bd_nob_transferred, requested_nob);
1135                 return(-EPROTO);
1136         }
1137
1138         return (0);
1139 }
1140
1141 static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1142 {
1143         if (p1->flag != p2->flag) {
1144                 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1145                                   OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1146
1147                 /* warn if we try to combine flags that we don't know to be
1148                  * safe to combine */
1149                 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1150                         CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1151                               "report this at http://bugs.whamcloud.com/\n",
1152                               p1->flag, p2->flag);
1153                 }
1154                 return 0;
1155         }
1156
1157         return (p1->off + p1->count == p2->off);
1158 }
1159
1160 static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1161                                    struct brw_page **pga, int opc,
1162                                    cksum_type_t cksum_type)
1163 {
1164         __u32                           cksum;
1165         int                             i = 0;
1166         struct cfs_crypto_hash_desc     *hdesc;
1167         unsigned int                    bufsize;
1168         int                             err;
1169         unsigned char                   cfs_alg = cksum_obd2cfs(cksum_type);
1170
1171         LASSERT(pg_count > 0);
1172
1173         hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1174         if (IS_ERR(hdesc)) {
1175                 CERROR("Unable to initialize checksum hash %s\n",
1176                        cfs_crypto_hash_name(cfs_alg));
1177                 return PTR_ERR(hdesc);
1178         }
1179
1180         while (nob > 0 && pg_count > 0) {
1181                 int count = pga[i]->count > nob ? nob : pga[i]->count;
1182
1183                 /* corrupt the data before we compute the checksum, to
1184                  * simulate an OST->client data error */
1185                 if (i == 0 && opc == OST_READ &&
1186                     OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1187                         unsigned char *ptr = kmap(pga[i]->pg);
1188                         int off = pga[i]->off & ~CFS_PAGE_MASK;
1189                         memcpy(ptr + off, "bad1", min(4, nob));
1190                         kunmap(pga[i]->pg);
1191                 }
1192                 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1193                                   pga[i]->off & ~CFS_PAGE_MASK,
1194                                   count);
1195                 CDEBUG(D_PAGE,
1196                        "page %p map %p index %lu flags %lx count %u priv %0lx: off %d\n",
1197                        pga[i]->pg, pga[i]->pg->mapping, pga[i]->pg->index,
1198                        (long)pga[i]->pg->flags, page_count(pga[i]->pg),
1199                        page_private(pga[i]->pg),
1200                        (int)(pga[i]->off & ~CFS_PAGE_MASK));
1201
1202                 nob -= pga[i]->count;
1203                 pg_count--;
1204                 i++;
1205         }
1206
1207         bufsize = 4;
1208         err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1209
1210         if (err)
1211                 cfs_crypto_hash_final(hdesc, NULL, NULL);
1212
1213         /* For sending we only compute the wrong checksum instead
1214          * of corrupting the data so it is still correct on a redo */
1215         if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1216                 cksum++;
1217
1218         return cksum;
1219 }
1220
1221 static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1222                                 struct lov_stripe_md *lsm, obd_count page_count,
1223                                 struct brw_page **pga,
1224                                 struct ptlrpc_request **reqp,
1225                                 struct obd_capa *ocapa, int reserve,
1226                                 int resend)
1227 {
1228         struct ptlrpc_request   *req;
1229         struct ptlrpc_bulk_desc *desc;
1230         struct ost_body  *body;
1231         struct obd_ioobj        *ioobj;
1232         struct niobuf_remote    *niobuf;
1233         int niocount, i, requested_nob, opc, rc;
1234         struct osc_brw_async_args *aa;
1235         struct req_capsule      *pill;
1236         struct brw_page *pg_prev;
1237
1238         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1239                 return -ENOMEM; /* Recoverable */
1240         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1241                 return -EINVAL; /* Fatal */
1242
1243         if ((cmd & OBD_BRW_WRITE) != 0) {
1244                 opc = OST_WRITE;
1245                 req = ptlrpc_request_alloc_pool(cli->cl_import,
1246                                                 cli->cl_import->imp_rq_pool,
1247                                                 &RQF_OST_BRW_WRITE);
1248         } else {
1249                 opc = OST_READ;
1250                 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1251         }
1252         if (req == NULL)
1253                 return -ENOMEM;
1254
1255         for (niocount = i = 1; i < page_count; i++) {
1256                 if (!can_merge_pages(pga[i - 1], pga[i]))
1257                         niocount++;
1258         }
1259
1260         pill = &req->rq_pill;
1261         req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1262                              sizeof(*ioobj));
1263         req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1264                              niocount * sizeof(*niobuf));
1265         osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1266
1267         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1268         if (rc) {
1269                 ptlrpc_request_free(req);
1270                 return rc;
1271         }
1272         req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1273         ptlrpc_at_set_req_timeout(req);
1274         /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1275          * retry logic */
1276         req->rq_no_retry_einprogress = 1;
1277
1278         desc = ptlrpc_prep_bulk_imp(req, page_count,
1279                 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1280                 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1281                 OST_BULK_PORTAL);
1282
1283         if (desc == NULL)
1284                 GOTO(out, rc = -ENOMEM);
1285         /* NB request now owns desc and will free it when it gets freed */
1286
1287         body = req_capsule_client_get(pill, &RMF_OST_BODY);
1288         ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1289         niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1290         LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1291
1292         lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
1293
1294         obdo_to_ioobj(oa, ioobj);
1295         ioobj->ioo_bufcnt = niocount;
1296         /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1297          * that might be send for this request.  The actual number is decided
1298          * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1299          * "max - 1" for old client compatibility sending "0", and also so the
1300          * the actual maximum is a power-of-two number, not one less. LU-1431 */
1301         ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1302         osc_pack_capa(req, body, ocapa);
1303         LASSERT(page_count > 0);
1304         pg_prev = pga[0];
1305         for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1306                 struct brw_page *pg = pga[i];
1307                 int poff = pg->off & ~CFS_PAGE_MASK;
1308
1309                 LASSERT(pg->count > 0);
1310                 /* make sure there is no gap in the middle of page array */
1311                 LASSERTF(page_count == 1 ||
1312                          (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1313                           ergo(i > 0 && i < page_count - 1,
1314                                poff == 0 && pg->count == PAGE_CACHE_SIZE)   &&
1315                           ergo(i == page_count - 1, poff == 0)),
1316                          "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1317                          i, page_count, pg, pg->off, pg->count);
1318                 LASSERTF(i == 0 || pg->off > pg_prev->off,
1319                          "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1320                          " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1321                          i, page_count,
1322                          pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1323                          pg_prev->pg, page_private(pg_prev->pg),
1324                          pg_prev->pg->index, pg_prev->off);
1325                 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1326                         (pg->flag & OBD_BRW_SRVLOCK));
1327
1328                 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1329                 requested_nob += pg->count;
1330
1331                 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1332                         niobuf--;
1333                         niobuf->len += pg->count;
1334                 } else {
1335                         niobuf->offset = pg->off;
1336                         niobuf->len    = pg->count;
1337                         niobuf->flags  = pg->flag;
1338                 }
1339                 pg_prev = pg;
1340         }
1341
1342         LASSERTF((void *)(niobuf - niocount) ==
1343                 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1344                 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1345                 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1346
1347         osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1348         if (resend) {
1349                 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1350                         body->oa.o_valid |= OBD_MD_FLFLAGS;
1351                         body->oa.o_flags = 0;
1352                 }
1353                 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1354         }
1355
1356         if (osc_should_shrink_grant(cli))
1357                 osc_shrink_grant_local(cli, &body->oa);
1358
1359         /* size[REQ_REC_OFF] still sizeof (*body) */
1360         if (opc == OST_WRITE) {
1361                 if (cli->cl_checksum &&
1362                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1363                         /* store cl_cksum_type in a local variable since
1364                          * it can be changed via lprocfs */
1365                         cksum_type_t cksum_type = cli->cl_cksum_type;
1366
1367                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1368                                 oa->o_flags &= OBD_FL_LOCAL_MASK;
1369                                 body->oa.o_flags = 0;
1370                         }
1371                         body->oa.o_flags |= cksum_type_pack(cksum_type);
1372                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1373                         body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1374                                                              page_count, pga,
1375                                                              OST_WRITE,
1376                                                              cksum_type);
1377                         CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1378                                body->oa.o_cksum);
1379                         /* save this in 'oa', too, for later checking */
1380                         oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1381                         oa->o_flags |= cksum_type_pack(cksum_type);
1382                 } else {
1383                         /* clear out the checksum flag, in case this is a
1384                          * resend but cl_checksum is no longer set. b=11238 */
1385                         oa->o_valid &= ~OBD_MD_FLCKSUM;
1386                 }
1387                 oa->o_cksum = body->oa.o_cksum;
1388                 /* 1 RC per niobuf */
1389                 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1390                                      sizeof(__u32) * niocount);
1391         } else {
1392                 if (cli->cl_checksum &&
1393                     !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1394                         if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1395                                 body->oa.o_flags = 0;
1396                         body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1397                         body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1398                 }
1399         }
1400         ptlrpc_request_set_replen(req);
1401
1402         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1403         aa = ptlrpc_req_async_args(req);
1404         aa->aa_oa = oa;
1405         aa->aa_requested_nob = requested_nob;
1406         aa->aa_nio_count = niocount;
1407         aa->aa_page_count = page_count;
1408         aa->aa_resends = 0;
1409         aa->aa_ppga = pga;
1410         aa->aa_cli = cli;
1411         INIT_LIST_HEAD(&aa->aa_oaps);
1412         if (ocapa && reserve)
1413                 aa->aa_ocapa = capa_get(ocapa);
1414
1415         *reqp = req;
1416         return 0;
1417
1418  out:
1419         ptlrpc_req_finished(req);
1420         return rc;
1421 }
1422
1423 static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1424                                 __u32 client_cksum, __u32 server_cksum, int nob,
1425                                 obd_count page_count, struct brw_page **pga,
1426                                 cksum_type_t client_cksum_type)
1427 {
1428         __u32 new_cksum;
1429         char *msg;
1430         cksum_type_t cksum_type;
1431
1432         if (server_cksum == client_cksum) {
1433                 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1434                 return 0;
1435         }
1436
1437         cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1438                                        oa->o_flags : 0);
1439         new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1440                                       cksum_type);
1441
1442         if (cksum_type != client_cksum_type)
1443                 msg = "the server did not use the checksum type specified in "
1444                       "the original request - likely a protocol problem";
1445         else if (new_cksum == server_cksum)
1446                 msg = "changed on the client after we checksummed it - "
1447                       "likely false positive due to mmap IO (bug 11742)";
1448         else if (new_cksum == client_cksum)
1449                 msg = "changed in transit before arrival at OST";
1450         else
1451                 msg = "changed in transit AND doesn't match the original - "
1452                       "likely false positive due to mmap IO (bug 11742)";
1453
1454         LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1455                            " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1456                            msg, libcfs_nid2str(peer->nid),
1457                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1458                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1459                            oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1460                            POSTID(&oa->o_oi), pga[0]->off,
1461                            pga[page_count-1]->off + pga[page_count-1]->count - 1);
1462         CERROR("original client csum %x (type %x), server csum %x (type %x), "
1463                "client csum now %x\n", client_cksum, client_cksum_type,
1464                server_cksum, cksum_type, new_cksum);
1465         return 1;
1466 }
1467
1468 /* Note rc enters this function as number of bytes transferred */
1469 static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1470 {
1471         struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1472         const lnet_process_id_t *peer =
1473                         &req->rq_import->imp_connection->c_peer;
1474         struct client_obd *cli = aa->aa_cli;
1475         struct ost_body *body;
1476         __u32 client_cksum = 0;
1477
1478         if (rc < 0 && rc != -EDQUOT) {
1479                 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1480                 return rc;
1481         }
1482
1483         LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1484         body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1485         if (body == NULL) {
1486                 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1487                 return -EPROTO;
1488         }
1489
1490         /* set/clear over quota flag for a uid/gid */
1491         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1492             body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1493                 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1494
1495                 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1496                        body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1497                        body->oa.o_flags);
1498                 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1499         }
1500
1501         osc_update_grant(cli, body);
1502
1503         if (rc < 0)
1504                 return rc;
1505
1506         if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1507                 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1508
1509         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1510                 if (rc > 0) {
1511                         CERROR("Unexpected +ve rc %d\n", rc);
1512                         return -EPROTO;
1513                 }
1514                 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1515
1516                 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1517                         return -EAGAIN;
1518
1519                 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1520                     check_write_checksum(&body->oa, peer, client_cksum,
1521                                          body->oa.o_cksum, aa->aa_requested_nob,
1522                                          aa->aa_page_count, aa->aa_ppga,
1523                                          cksum_type_unpack(aa->aa_oa->o_flags)))
1524                         return -EAGAIN;
1525
1526                 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1527                                      aa->aa_page_count, aa->aa_ppga);
1528                 GOTO(out, rc);
1529         }
1530
1531         /* The rest of this function executes only for OST_READs */
1532
1533         /* if unwrap_bulk failed, return -EAGAIN to retry */
1534         rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1535         if (rc < 0)
1536                 GOTO(out, rc = -EAGAIN);
1537
1538         if (rc > aa->aa_requested_nob) {
1539                 CERROR("Unexpected rc %d (%d requested)\n", rc,
1540                        aa->aa_requested_nob);
1541                 return -EPROTO;
1542         }
1543
1544         if (rc != req->rq_bulk->bd_nob_transferred) {
1545                 CERROR ("Unexpected rc %d (%d transferred)\n",
1546                         rc, req->rq_bulk->bd_nob_transferred);
1547                 return (-EPROTO);
1548         }
1549
1550         if (rc < aa->aa_requested_nob)
1551                 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1552
1553         if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1554                 static int cksum_counter;
1555                 __u32      server_cksum = body->oa.o_cksum;
1556                 char      *via;
1557                 char      *router;
1558                 cksum_type_t cksum_type;
1559
1560                 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1561                                                body->oa.o_flags : 0);
1562                 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1563                                                  aa->aa_ppga, OST_READ,
1564                                                  cksum_type);
1565
1566                 if (peer->nid == req->rq_bulk->bd_sender) {
1567                         via = router = "";
1568                 } else {
1569                         via = " via ";
1570                         router = libcfs_nid2str(req->rq_bulk->bd_sender);
1571                 }
1572
1573                 if (server_cksum != client_cksum) {
1574                         LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1575                                            "%s%s%s inode "DFID" object "DOSTID
1576                                            " extent ["LPU64"-"LPU64"]\n",
1577                                            req->rq_import->imp_obd->obd_name,
1578                                            libcfs_nid2str(peer->nid),
1579                                            via, router,
1580                                            body->oa.o_valid & OBD_MD_FLFID ?
1581                                                 body->oa.o_parent_seq : (__u64)0,
1582                                            body->oa.o_valid & OBD_MD_FLFID ?
1583                                                 body->oa.o_parent_oid : 0,
1584                                            body->oa.o_valid & OBD_MD_FLFID ?
1585                                                 body->oa.o_parent_ver : 0,
1586                                            POSTID(&body->oa.o_oi),
1587                                            aa->aa_ppga[0]->off,
1588                                            aa->aa_ppga[aa->aa_page_count-1]->off +
1589                                            aa->aa_ppga[aa->aa_page_count-1]->count -
1590                                                                         1);
1591                         CERROR("client %x, server %x, cksum_type %x\n",
1592                                client_cksum, server_cksum, cksum_type);
1593                         cksum_counter = 0;
1594                         aa->aa_oa->o_cksum = client_cksum;
1595                         rc = -EAGAIN;
1596                 } else {
1597                         cksum_counter++;
1598                         CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1599                         rc = 0;
1600                 }
1601         } else if (unlikely(client_cksum)) {
1602                 static int cksum_missed;
1603
1604                 cksum_missed++;
1605                 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1606                         CERROR("Checksum %u requested from %s but not sent\n",
1607                                cksum_missed, libcfs_nid2str(peer->nid));
1608         } else {
1609                 rc = 0;
1610         }
1611 out:
1612         if (rc >= 0)
1613                 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1614                                      aa->aa_oa, &body->oa);
1615
1616         return rc;
1617 }
1618
1619 static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1620                             struct lov_stripe_md *lsm,
1621                             obd_count page_count, struct brw_page **pga,
1622                             struct obd_capa *ocapa)
1623 {
1624         struct ptlrpc_request *req;
1625         int                 rc;
1626         wait_queue_head_t           waitq;
1627         int                 generation, resends = 0;
1628         struct l_wait_info     lwi;
1629
1630         init_waitqueue_head(&waitq);
1631         generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1632
1633 restart_bulk:
1634         rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1635                                   page_count, pga, &req, ocapa, 0, resends);
1636         if (rc != 0)
1637                 return (rc);
1638
1639         if (resends) {
1640                 req->rq_generation_set = 1;
1641                 req->rq_import_generation = generation;
1642                 req->rq_sent = cfs_time_current_sec() + resends;
1643         }
1644
1645         rc = ptlrpc_queue_wait(req);
1646
1647         if (rc == -ETIMEDOUT && req->rq_resend) {
1648                 DEBUG_REQ(D_HA, req,  "BULK TIMEOUT");
1649                 ptlrpc_req_finished(req);
1650                 goto restart_bulk;
1651         }
1652
1653         rc = osc_brw_fini_request(req, rc);
1654
1655         ptlrpc_req_finished(req);
1656         /* When server return -EINPROGRESS, client should always retry
1657          * regardless of the number of times the bulk was resent already.*/
1658         if (osc_recoverable_error(rc)) {
1659                 resends++;
1660                 if (rc != -EINPROGRESS &&
1661                     !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1662                         CERROR("%s: too many resend retries for object: "
1663                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1664                                POSTID(&oa->o_oi), rc);
1665                         goto out;
1666                 }
1667                 if (generation !=
1668                     exp->exp_obd->u.cli.cl_import->imp_generation) {
1669                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1670                                ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1671                                POSTID(&oa->o_oi), rc);
1672                         goto out;
1673                 }
1674
1675                 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1676                                        NULL);
1677                 l_wait_event(waitq, 0, &lwi);
1678
1679                 goto restart_bulk;
1680         }
1681 out:
1682         if (rc == -EAGAIN || rc == -EINPROGRESS)
1683                 rc = -EIO;
1684         return rc;
1685 }
1686
1687 static int osc_brw_redo_request(struct ptlrpc_request *request,
1688                                 struct osc_brw_async_args *aa, int rc)
1689 {
1690         struct ptlrpc_request *new_req;
1691         struct osc_brw_async_args *new_aa;
1692         struct osc_async_page *oap;
1693
1694         DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1695                   "redo for recoverable error %d", rc);
1696
1697         rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1698                                         OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1699                                   aa->aa_cli, aa->aa_oa,
1700                                   NULL /* lsm unused by osc currently */,
1701                                   aa->aa_page_count, aa->aa_ppga,
1702                                   &new_req, aa->aa_ocapa, 0, 1);
1703         if (rc)
1704                 return rc;
1705
1706         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1707                 if (oap->oap_request != NULL) {
1708                         LASSERTF(request == oap->oap_request,
1709                                  "request %p != oap_request %p\n",
1710                                  request, oap->oap_request);
1711                         if (oap->oap_interrupted) {
1712                                 ptlrpc_req_finished(new_req);
1713                                 return -EINTR;
1714                         }
1715                 }
1716         }
1717         /* New request takes over pga and oaps from old request.
1718          * Note that copying a list_head doesn't work, need to move it... */
1719         aa->aa_resends++;
1720         new_req->rq_interpret_reply = request->rq_interpret_reply;
1721         new_req->rq_async_args = request->rq_async_args;
1722         /* cap resend delay to the current request timeout, this is similar to
1723          * what ptlrpc does (see after_reply()) */
1724         if (aa->aa_resends > new_req->rq_timeout)
1725                 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1726         else
1727                 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1728         new_req->rq_generation_set = 1;
1729         new_req->rq_import_generation = request->rq_import_generation;
1730
1731         new_aa = ptlrpc_req_async_args(new_req);
1732
1733         INIT_LIST_HEAD(&new_aa->aa_oaps);
1734         list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1735         INIT_LIST_HEAD(&new_aa->aa_exts);
1736         list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1737         new_aa->aa_resends = aa->aa_resends;
1738
1739         list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1740                 if (oap->oap_request) {
1741                         ptlrpc_req_finished(oap->oap_request);
1742                         oap->oap_request = ptlrpc_request_addref(new_req);
1743                 }
1744         }
1745
1746         new_aa->aa_ocapa = aa->aa_ocapa;
1747         aa->aa_ocapa = NULL;
1748
1749         /* XXX: This code will run into problem if we're going to support
1750          * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1751          * and wait for all of them to be finished. We should inherit request
1752          * set from old request. */
1753         ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1754
1755         DEBUG_REQ(D_INFO, new_req, "new request");
1756         return 0;
1757 }
1758
1759 /*
1760  * ugh, we want disk allocation on the target to happen in offset order.  we'll
1761  * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1762  * fine for our small page arrays and doesn't require allocation.  its an
1763  * insertion sort that swaps elements that are strides apart, shrinking the
1764  * stride down until its '1' and the array is sorted.
1765  */
1766 static void sort_brw_pages(struct brw_page **array, int num)
1767 {
1768         int stride, i, j;
1769         struct brw_page *tmp;
1770
1771         if (num == 1)
1772                 return;
1773         for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1774                 ;
1775
1776         do {
1777                 stride /= 3;
1778                 for (i = stride ; i < num ; i++) {
1779                         tmp = array[i];
1780                         j = i;
1781                         while (j >= stride && array[j - stride]->off > tmp->off) {
1782                                 array[j] = array[j - stride];
1783                                 j -= stride;
1784                         }
1785                         array[j] = tmp;
1786                 }
1787         } while (stride > 1);
1788 }
1789
1790 static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1791 {
1792         int count = 1;
1793         int offset;
1794         int i = 0;
1795
1796         LASSERT (pages > 0);
1797         offset = pg[i]->off & ~CFS_PAGE_MASK;
1798
1799         for (;;) {
1800                 pages--;
1801                 if (pages == 0)  /* that's all */
1802                         return count;
1803
1804                 if (offset + pg[i]->count < PAGE_CACHE_SIZE)
1805                         return count;   /* doesn't end on page boundary */
1806
1807                 i++;
1808                 offset = pg[i]->off & ~CFS_PAGE_MASK;
1809                 if (offset != 0)        /* doesn't start on page boundary */
1810                         return count;
1811
1812                 count++;
1813         }
1814 }
1815
1816 static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1817 {
1818         struct brw_page **ppga;
1819         int i;
1820
1821         OBD_ALLOC(ppga, sizeof(*ppga) * count);
1822         if (ppga == NULL)
1823                 return NULL;
1824
1825         for (i = 0; i < count; i++)
1826                 ppga[i] = pga + i;
1827         return ppga;
1828 }
1829
1830 static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1831 {
1832         LASSERT(ppga != NULL);
1833         OBD_FREE(ppga, sizeof(*ppga) * count);
1834 }
1835
1836 static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1837                    obd_count page_count, struct brw_page *pga,
1838                    struct obd_trans_info *oti)
1839 {
1840         struct obdo *saved_oa = NULL;
1841         struct brw_page **ppga, **orig;
1842         struct obd_import *imp = class_exp2cliimp(exp);
1843         struct client_obd *cli;
1844         int rc, page_count_orig;
1845
1846         LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1847         cli = &imp->imp_obd->u.cli;
1848
1849         if (cmd & OBD_BRW_CHECK) {
1850                 /* The caller just wants to know if there's a chance that this
1851                  * I/O can succeed */
1852
1853                 if (imp->imp_invalid)
1854                         return -EIO;
1855                 return 0;
1856         }
1857
1858         /* test_brw with a failed create can trip this, maybe others. */
1859         LASSERT(cli->cl_max_pages_per_rpc);
1860
1861         rc = 0;
1862
1863         orig = ppga = osc_build_ppga(pga, page_count);
1864         if (ppga == NULL)
1865                 return -ENOMEM;
1866         page_count_orig = page_count;
1867
1868         sort_brw_pages(ppga, page_count);
1869         while (page_count) {
1870                 obd_count pages_per_brw;
1871
1872                 if (page_count > cli->cl_max_pages_per_rpc)
1873                         pages_per_brw = cli->cl_max_pages_per_rpc;
1874                 else
1875                         pages_per_brw = page_count;
1876
1877                 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1878
1879                 if (saved_oa != NULL) {
1880                         /* restore previously saved oa */
1881                         *oinfo->oi_oa = *saved_oa;
1882                 } else if (page_count > pages_per_brw) {
1883                         /* save a copy of oa (brw will clobber it) */
1884                         OBDO_ALLOC(saved_oa);
1885                         if (saved_oa == NULL)
1886                                 GOTO(out, rc = -ENOMEM);
1887                         *saved_oa = *oinfo->oi_oa;
1888                 }
1889
1890                 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1891                                       pages_per_brw, ppga, oinfo->oi_capa);
1892
1893                 if (rc != 0)
1894                         break;
1895
1896                 page_count -= pages_per_brw;
1897                 ppga += pages_per_brw;
1898         }
1899
1900 out:
1901         osc_release_ppga(orig, page_count_orig);
1902
1903         if (saved_oa != NULL)
1904                 OBDO_FREE(saved_oa);
1905
1906         return rc;
1907 }
1908
1909 static int brw_interpret(const struct lu_env *env,
1910                          struct ptlrpc_request *req, void *data, int rc)
1911 {
1912         struct osc_brw_async_args *aa = data;
1913         struct osc_extent *ext;
1914         struct osc_extent *tmp;
1915         struct cl_object  *obj = NULL;
1916         struct client_obd *cli = aa->aa_cli;
1917
1918         rc = osc_brw_fini_request(req, rc);
1919         CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1920         /* When server return -EINPROGRESS, client should always retry
1921          * regardless of the number of times the bulk was resent already. */
1922         if (osc_recoverable_error(rc)) {
1923                 if (req->rq_import_generation !=
1924                     req->rq_import->imp_generation) {
1925                         CDEBUG(D_HA, "%s: resend cross eviction for object: "
1926                                ""DOSTID", rc = %d.\n",
1927                                req->rq_import->imp_obd->obd_name,
1928                                POSTID(&aa->aa_oa->o_oi), rc);
1929                 } else if (rc == -EINPROGRESS ||
1930                     client_should_resend(aa->aa_resends, aa->aa_cli)) {
1931                         rc = osc_brw_redo_request(req, aa, rc);
1932                 } else {
1933                         CERROR("%s: too many resent retries for object: "
1934                                ""LPU64":"LPU64", rc = %d.\n",
1935                                req->rq_import->imp_obd->obd_name,
1936                                POSTID(&aa->aa_oa->o_oi), rc);
1937                 }
1938
1939                 if (rc == 0)
1940                         return 0;
1941                 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1942                         rc = -EIO;
1943         }
1944
1945         if (aa->aa_ocapa) {
1946                 capa_put(aa->aa_ocapa);
1947                 aa->aa_ocapa = NULL;
1948         }
1949
1950         list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1951                 if (obj == NULL && rc == 0) {
1952                         obj = osc2cl(ext->oe_obj);
1953                         cl_object_get(obj);
1954                 }
1955
1956                 list_del_init(&ext->oe_link);
1957                 osc_extent_finish(env, ext, 1, rc);
1958         }
1959         LASSERT(list_empty(&aa->aa_exts));
1960         LASSERT(list_empty(&aa->aa_oaps));
1961
1962         if (obj != NULL) {
1963                 struct obdo *oa = aa->aa_oa;
1964                 struct cl_attr *attr  = &osc_env_info(env)->oti_attr;
1965                 unsigned long valid = 0;
1966
1967                 LASSERT(rc == 0);
1968                 if (oa->o_valid & OBD_MD_FLBLOCKS) {
1969                         attr->cat_blocks = oa->o_blocks;
1970                         valid |= CAT_BLOCKS;
1971                 }
1972                 if (oa->o_valid & OBD_MD_FLMTIME) {
1973                         attr->cat_mtime = oa->o_mtime;
1974                         valid |= CAT_MTIME;
1975                 }
1976                 if (oa->o_valid & OBD_MD_FLATIME) {
1977                         attr->cat_atime = oa->o_atime;
1978                         valid |= CAT_ATIME;
1979                 }
1980                 if (oa->o_valid & OBD_MD_FLCTIME) {
1981                         attr->cat_ctime = oa->o_ctime;
1982                         valid |= CAT_CTIME;
1983                 }
1984                 if (valid != 0) {
1985                         cl_object_attr_lock(obj);
1986                         cl_object_attr_set(env, obj, attr, valid);
1987                         cl_object_attr_unlock(obj);
1988                 }
1989                 cl_object_put(env, obj);
1990         }
1991         OBDO_FREE(aa->aa_oa);
1992
1993         cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
1994                           req->rq_bulk->bd_nob_transferred);
1995         osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
1996         ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
1997
1998         client_obd_list_lock(&cli->cl_loi_list_lock);
1999         /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2000          * is called so we know whether to go to sync BRWs or wait for more
2001          * RPCs to complete */
2002         if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2003                 cli->cl_w_in_flight--;
2004         else
2005                 cli->cl_r_in_flight--;
2006         osc_wake_cache_waiters(cli);
2007         client_obd_list_unlock(&cli->cl_loi_list_lock);
2008
2009         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2010         return rc;
2011 }
2012
2013 /**
2014  * Build an RPC by the list of extent @ext_list. The caller must ensure
2015  * that the total pages in this list are NOT over max pages per RPC.
2016  * Extents in the list must be in OES_RPC state.
2017  */
2018 int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2019                   struct list_head *ext_list, int cmd, pdl_policy_t pol)
2020 {
2021         struct ptlrpc_request           *req = NULL;
2022         struct osc_extent               *ext;
2023         struct brw_page                 **pga = NULL;
2024         struct osc_brw_async_args       *aa = NULL;
2025         struct obdo                     *oa = NULL;
2026         struct osc_async_page           *oap;
2027         struct osc_async_page           *tmp;
2028         struct cl_req                   *clerq = NULL;
2029         enum cl_req_type                crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2030                                                                       CRT_READ;
2031         struct ldlm_lock                *lock = NULL;
2032         struct cl_req_attr              *crattr = NULL;
2033         obd_off                         starting_offset = OBD_OBJECT_EOF;
2034         obd_off                         ending_offset = 0;
2035         int                             mpflag = 0;
2036         int                             mem_tight = 0;
2037         int                             page_count = 0;
2038         int                             i;
2039         int                             rc;
2040         LIST_HEAD(rpc_list);
2041
2042         LASSERT(!list_empty(ext_list));
2043
2044         /* add pages into rpc_list to build BRW rpc */
2045         list_for_each_entry(ext, ext_list, oe_link) {
2046                 LASSERT(ext->oe_state == OES_RPC);
2047                 mem_tight |= ext->oe_memalloc;
2048                 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2049                         ++page_count;
2050                         list_add_tail(&oap->oap_rpc_item, &rpc_list);
2051                         if (starting_offset > oap->oap_obj_off)
2052                                 starting_offset = oap->oap_obj_off;
2053                         else
2054                                 LASSERT(oap->oap_page_off == 0);
2055                         if (ending_offset < oap->oap_obj_off + oap->oap_count)
2056                                 ending_offset = oap->oap_obj_off +
2057                                                 oap->oap_count;
2058                         else
2059                                 LASSERT(oap->oap_page_off + oap->oap_count ==
2060                                         PAGE_CACHE_SIZE);
2061                 }
2062         }
2063
2064         if (mem_tight)
2065                 mpflag = cfs_memory_pressure_get_and_set();
2066
2067         OBD_ALLOC(crattr, sizeof(*crattr));
2068         if (crattr == NULL)
2069                 GOTO(out, rc = -ENOMEM);
2070
2071         OBD_ALLOC(pga, sizeof(*pga) * page_count);
2072         if (pga == NULL)
2073                 GOTO(out, rc = -ENOMEM);
2074
2075         OBDO_ALLOC(oa);
2076         if (oa == NULL)
2077                 GOTO(out, rc = -ENOMEM);
2078
2079         i = 0;
2080         list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2081                 struct cl_page *page = oap2cl_page(oap);
2082                 if (clerq == NULL) {
2083                         clerq = cl_req_alloc(env, page, crt,
2084                                              1 /* only 1-object rpcs for now */);
2085                         if (IS_ERR(clerq))
2086                                 GOTO(out, rc = PTR_ERR(clerq));
2087                         lock = oap->oap_ldlm_lock;
2088                 }
2089                 if (mem_tight)
2090                         oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2091                 pga[i] = &oap->oap_brw_page;
2092                 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2093                 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
2094                        pga[i]->pg, page_index(oap->oap_page), oap,
2095                        pga[i]->flag);
2096                 i++;
2097                 cl_req_page_add(env, clerq, page);
2098         }
2099
2100         /* always get the data for the obdo for the rpc */
2101         LASSERT(clerq != NULL);
2102         crattr->cra_oa = oa;
2103         cl_req_attr_set(env, clerq, crattr, ~0ULL);
2104         if (lock) {
2105                 oa->o_handle = lock->l_remote_handle;
2106                 oa->o_valid |= OBD_MD_FLHANDLE;
2107         }
2108
2109         rc = cl_req_prep(env, clerq);
2110         if (rc != 0) {
2111                 CERROR("cl_req_prep failed: %d\n", rc);
2112                 GOTO(out, rc);
2113         }
2114
2115         sort_brw_pages(pga, page_count);
2116         rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
2117                         pga, &req, crattr->cra_capa, 1, 0);
2118         if (rc != 0) {
2119                 CERROR("prep_req failed: %d\n", rc);
2120                 GOTO(out, rc);
2121         }
2122
2123         req->rq_interpret_reply = brw_interpret;
2124
2125         if (mem_tight != 0)
2126                 req->rq_memalloc = 1;
2127
2128         /* Need to update the timestamps after the request is built in case
2129          * we race with setattr (locally or in queue at OST).  If OST gets
2130          * later setattr before earlier BRW (as determined by the request xid),
2131          * the OST will not use BRW timestamps.  Sadly, there is no obvious
2132          * way to do this in a single call.  bug 10150 */
2133         cl_req_attr_set(env, clerq, crattr,
2134                         OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2135
2136         lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
2137
2138         CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2139         aa = ptlrpc_req_async_args(req);
2140         INIT_LIST_HEAD(&aa->aa_oaps);
2141         list_splice_init(&rpc_list, &aa->aa_oaps);
2142         INIT_LIST_HEAD(&aa->aa_exts);
2143         list_splice_init(ext_list, &aa->aa_exts);
2144         aa->aa_clerq = clerq;
2145
2146         /* queued sync pages can be torn down while the pages
2147          * were between the pending list and the rpc */
2148         tmp = NULL;
2149         list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2150                 /* only one oap gets a request reference */
2151                 if (tmp == NULL)
2152                         tmp = oap;
2153                 if (oap->oap_interrupted && !req->rq_intr) {
2154                         CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2155                                         oap, req);
2156                         ptlrpc_mark_interrupted(req);
2157                 }
2158         }
2159         if (tmp != NULL)
2160                 tmp->oap_request = ptlrpc_request_addref(req);
2161
2162         client_obd_list_lock(&cli->cl_loi_list_lock);
2163         starting_offset >>= PAGE_CACHE_SHIFT;
2164         if (cmd == OBD_BRW_READ) {
2165                 cli->cl_r_in_flight++;
2166                 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2167                 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2168                 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2169                                       starting_offset + 1);
2170         } else {
2171                 cli->cl_w_in_flight++;
2172                 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2173                 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2174                 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2175                                       starting_offset + 1);
2176         }
2177         client_obd_list_unlock(&cli->cl_loi_list_lock);
2178
2179         DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2180                   page_count, aa, cli->cl_r_in_flight,
2181                   cli->cl_w_in_flight);
2182
2183         /* XXX: Maybe the caller can check the RPC bulk descriptor to
2184          * see which CPU/NUMA node the majority of pages were allocated
2185          * on, and try to assign the async RPC to the CPU core
2186          * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2187          *
2188          * But on the other hand, we expect that multiple ptlrpcd
2189          * threads and the initial write sponsor can run in parallel,
2190          * especially when data checksum is enabled, which is CPU-bound
2191          * operation and single ptlrpcd thread cannot process in time.
2192          * So more ptlrpcd threads sharing BRW load
2193          * (with PDL_POLICY_ROUND) seems better.
2194          */
2195         ptlrpcd_add_req(req, pol, -1);
2196         rc = 0;
2197
2198 out:
2199         if (mem_tight != 0)
2200                 cfs_memory_pressure_restore(mpflag);
2201
2202         if (crattr != NULL) {
2203                 capa_put(crattr->cra_capa);
2204                 OBD_FREE(crattr, sizeof(*crattr));
2205         }
2206
2207         if (rc != 0) {
2208                 LASSERT(req == NULL);
2209
2210                 if (oa)
2211                         OBDO_FREE(oa);
2212                 if (pga)
2213                         OBD_FREE(pga, sizeof(*pga) * page_count);
2214                 /* this should happen rarely and is pretty bad, it makes the
2215                  * pending list not follow the dirty order */
2216                 while (!list_empty(ext_list)) {
2217                         ext = list_entry(ext_list->next, struct osc_extent,
2218                                              oe_link);
2219                         list_del_init(&ext->oe_link);
2220                         osc_extent_finish(env, ext, 0, rc);
2221                 }
2222                 if (clerq && !IS_ERR(clerq))
2223                         cl_req_completion(env, clerq, rc);
2224         }
2225         return rc;
2226 }
2227
2228 static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2229                                         struct ldlm_enqueue_info *einfo)
2230 {
2231         void *data = einfo->ei_cbdata;
2232         int set = 0;
2233
2234         LASSERT(lock != NULL);
2235         LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2236         LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2237         LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2238         LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2239
2240         lock_res_and_lock(lock);
2241         spin_lock(&osc_ast_guard);
2242
2243         if (lock->l_ast_data == NULL)
2244                 lock->l_ast_data = data;
2245         if (lock->l_ast_data == data)
2246                 set = 1;
2247
2248         spin_unlock(&osc_ast_guard);
2249         unlock_res_and_lock(lock);
2250
2251         return set;
2252 }
2253
2254 static int osc_set_data_with_check(struct lustre_handle *lockh,
2255                                    struct ldlm_enqueue_info *einfo)
2256 {
2257         struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2258         int set = 0;
2259
2260         if (lock != NULL) {
2261                 set = osc_set_lock_data_with_check(lock, einfo);
2262                 LDLM_LOCK_PUT(lock);
2263         } else
2264                 CERROR("lockh %p, data %p - client evicted?\n",
2265                        lockh, einfo->ei_cbdata);
2266         return set;
2267 }
2268
2269 static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2270                              ldlm_iterator_t replace, void *data)
2271 {
2272         struct ldlm_res_id res_id;
2273         struct obd_device *obd = class_exp2obd(exp);
2274
2275         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2276         ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2277         return 0;
2278 }
2279
2280 /* find any ldlm lock of the inode in osc
2281  * return 0    not find
2282  *      1    find one
2283  *      < 0    error */
2284 static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2285                            ldlm_iterator_t replace, void *data)
2286 {
2287         struct ldlm_res_id res_id;
2288         struct obd_device *obd = class_exp2obd(exp);
2289         int rc = 0;
2290
2291         ostid_build_res_name(&lsm->lsm_oi, &res_id);
2292         rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2293         if (rc == LDLM_ITER_STOP)
2294                 return(1);
2295         if (rc == LDLM_ITER_CONTINUE)
2296                 return(0);
2297         return(rc);
2298 }
2299
2300 static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2301                             obd_enqueue_update_f upcall, void *cookie,
2302                             __u64 *flags, int agl, int rc)
2303 {
2304         int intent = *flags & LDLM_FL_HAS_INTENT;
2305
2306         if (intent) {
2307                 /* The request was created before ldlm_cli_enqueue call. */
2308                 if (rc == ELDLM_LOCK_ABORTED) {
2309                         struct ldlm_reply *rep;
2310                         rep = req_capsule_server_get(&req->rq_pill,
2311                                                      &RMF_DLM_REP);
2312
2313                         LASSERT(rep != NULL);
2314                         rep->lock_policy_res1 =
2315                                 ptlrpc_status_ntoh(rep->lock_policy_res1);
2316                         if (rep->lock_policy_res1)
2317                                 rc = rep->lock_policy_res1;
2318                 }
2319         }
2320
2321         if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2322             (rc == 0)) {
2323                 *flags |= LDLM_FL_LVB_READY;
2324                 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2325                        lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2326         }
2327
2328         /* Call the update callback. */
2329         rc = (*upcall)(cookie, rc);
2330         return rc;
2331 }
2332
2333 static int osc_enqueue_interpret(const struct lu_env *env,
2334                                  struct ptlrpc_request *req,
2335                                  struct osc_enqueue_args *aa, int rc)
2336 {
2337         struct ldlm_lock *lock;
2338         struct lustre_handle handle;
2339         __u32 mode;
2340         struct ost_lvb *lvb;
2341         __u32 lvb_len;
2342         __u64 *flags = aa->oa_flags;
2343
2344         /* Make a local copy of a lock handle and a mode, because aa->oa_*
2345          * might be freed anytime after lock upcall has been called. */
2346         lustre_handle_copy(&handle, aa->oa_lockh);
2347         mode = aa->oa_ei->ei_mode;
2348
2349         /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2350          * be valid. */
2351         lock = ldlm_handle2lock(&handle);
2352
2353         /* Take an additional reference so that a blocking AST that
2354          * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2355          * to arrive after an upcall has been executed by
2356          * osc_enqueue_fini(). */
2357         ldlm_lock_addref(&handle, mode);
2358
2359         /* Let CP AST to grant the lock first. */
2360         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2361
2362         if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2363                 lvb = NULL;
2364                 lvb_len = 0;
2365         } else {
2366                 lvb = aa->oa_lvb;
2367                 lvb_len = sizeof(*aa->oa_lvb);
2368         }
2369
2370         /* Complete obtaining the lock procedure. */
2371         rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2372                                    mode, flags, lvb, lvb_len, &handle, rc);
2373         /* Complete osc stuff. */
2374         rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2375                               flags, aa->oa_agl, rc);
2376
2377         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2378
2379         /* Release the lock for async request. */
2380         if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2381                 /*
2382                  * Releases a reference taken by ldlm_cli_enqueue(), if it is
2383                  * not already released by
2384                  * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2385                  */
2386                 ldlm_lock_decref(&handle, mode);
2387
2388         LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2389                  aa->oa_lockh, req, aa);
2390         ldlm_lock_decref(&handle, mode);
2391         LDLM_LOCK_PUT(lock);
2392         return rc;
2393 }
2394
2395 void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2396                         struct lov_oinfo *loi, __u64 flags,
2397                         struct ost_lvb *lvb, __u32 mode, int rc)
2398 {
2399         struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2400
2401         if (rc == ELDLM_OK) {
2402                 __u64 tmp;
2403
2404                 LASSERT(lock != NULL);
2405                 loi->loi_lvb = *lvb;
2406                 tmp = loi->loi_lvb.lvb_size;
2407                 /* Extend KMS up to the end of this lock and no further
2408                  * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2409                 if (tmp > lock->l_policy_data.l_extent.end)
2410                         tmp = lock->l_policy_data.l_extent.end + 1;
2411                 if (tmp >= loi->loi_kms) {
2412                         LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2413                                    ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2414                         loi_kms_set(loi, tmp);
2415                 } else {
2416                         LDLM_DEBUG(lock, "lock acquired, setting rss="
2417                                    LPU64"; leaving kms="LPU64", end="LPU64,
2418                                    loi->loi_lvb.lvb_size, loi->loi_kms,
2419                                    lock->l_policy_data.l_extent.end);
2420                 }
2421                 ldlm_lock_allow_match(lock);
2422         } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2423                 LASSERT(lock != NULL);
2424                 loi->loi_lvb = *lvb;
2425                 ldlm_lock_allow_match(lock);
2426                 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2427                        " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2428                 rc = ELDLM_OK;
2429         }
2430
2431         if (lock != NULL) {
2432                 if (rc != ELDLM_OK)
2433                         ldlm_lock_fail_match(lock);
2434
2435                 LDLM_LOCK_PUT(lock);
2436         }
2437 }
2438 EXPORT_SYMBOL(osc_update_enqueue);
2439
2440 struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2441
2442 /* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2443  * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2444  * other synchronous requests, however keeping some locks and trying to obtain
2445  * others may take a considerable amount of time in a case of ost failure; and
2446  * when other sync requests do not get released lock from a client, the client
2447  * is excluded from the cluster -- such scenarious make the life difficult, so
2448  * release locks just after they are obtained. */
2449 int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2450                      __u64 *flags, ldlm_policy_data_t *policy,
2451                      struct ost_lvb *lvb, int kms_valid,
2452                      obd_enqueue_update_f upcall, void *cookie,
2453                      struct ldlm_enqueue_info *einfo,
2454                      struct lustre_handle *lockh,
2455                      struct ptlrpc_request_set *rqset, int async, int agl)
2456 {
2457         struct obd_device *obd = exp->exp_obd;
2458         struct ptlrpc_request *req = NULL;
2459         int intent = *flags & LDLM_FL_HAS_INTENT;
2460         __u64 match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2461         ldlm_mode_t mode;
2462         int rc;
2463
2464         /* Filesystem lock extents are extended to page boundaries so that
2465          * dealing with the page cache is a little smoother.  */
2466         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2467         policy->l_extent.end |= ~CFS_PAGE_MASK;
2468
2469         /*
2470          * kms is not valid when either object is completely fresh (so that no
2471          * locks are cached), or object was evicted. In the latter case cached
2472          * lock cannot be used, because it would prime inode state with
2473          * potentially stale LVB.
2474          */
2475         if (!kms_valid)
2476                 goto no_match;
2477
2478         /* Next, search for already existing extent locks that will cover us */
2479         /* If we're trying to read, we also search for an existing PW lock.  The
2480          * VFS and page cache already protect us locally, so lots of readers/
2481          * writers can share a single PW lock.
2482          *
2483          * There are problems with conversion deadlocks, so instead of
2484          * converting a read lock to a write lock, we'll just enqueue a new
2485          * one.
2486          *
2487          * At some point we should cancel the read lock instead of making them
2488          * send us a blocking callback, but there are problems with canceling
2489          * locks out from other users right now, too. */
2490         mode = einfo->ei_mode;
2491         if (einfo->ei_mode == LCK_PR)
2492                 mode |= LCK_PW;
2493         mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2494                                einfo->ei_type, policy, mode, lockh, 0);
2495         if (mode) {
2496                 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2497
2498                 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2499                         /* For AGL, if enqueue RPC is sent but the lock is not
2500                          * granted, then skip to process this strpe.
2501                          * Return -ECANCELED to tell the caller. */
2502                         ldlm_lock_decref(lockh, mode);
2503                         LDLM_LOCK_PUT(matched);
2504                         return -ECANCELED;
2505                 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2506                         *flags |= LDLM_FL_LVB_READY;
2507                         /* addref the lock only if not async requests and PW
2508                          * lock is matched whereas we asked for PR. */
2509                         if (!rqset && einfo->ei_mode != mode)
2510                                 ldlm_lock_addref(lockh, LCK_PR);
2511                         if (intent) {
2512                                 /* I would like to be able to ASSERT here that
2513                                  * rss <= kms, but I can't, for reasons which
2514                                  * are explained in lov_enqueue() */
2515                         }
2516
2517                         /* We already have a lock, and it's referenced.
2518                          *
2519                          * At this point, the cl_lock::cll_state is CLS_QUEUING,
2520                          * AGL upcall may change it to CLS_HELD directly. */
2521                         (*upcall)(cookie, ELDLM_OK);
2522
2523                         if (einfo->ei_mode != mode)
2524                                 ldlm_lock_decref(lockh, LCK_PW);
2525                         else if (rqset)
2526                                 /* For async requests, decref the lock. */
2527                                 ldlm_lock_decref(lockh, einfo->ei_mode);
2528                         LDLM_LOCK_PUT(matched);
2529                         return ELDLM_OK;
2530                 } else {
2531                         ldlm_lock_decref(lockh, mode);
2532                         LDLM_LOCK_PUT(matched);
2533                 }
2534         }
2535
2536  no_match:
2537         if (intent) {
2538                 LIST_HEAD(cancels);
2539                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2540                                            &RQF_LDLM_ENQUEUE_LVB);
2541                 if (req == NULL)
2542                         return -ENOMEM;
2543
2544                 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2545                 if (rc) {
2546                         ptlrpc_request_free(req);
2547                         return rc;
2548                 }
2549
2550                 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2551                                      sizeof(*lvb));
2552                 ptlrpc_request_set_replen(req);
2553         }
2554
2555         /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2556         *flags &= ~LDLM_FL_BLOCK_GRANTED;
2557
2558         rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2559                               sizeof(*lvb), LVB_T_OST, lockh, async);
2560         if (rqset) {
2561                 if (!rc) {
2562                         struct osc_enqueue_args *aa;
2563                         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2564                         aa = ptlrpc_req_async_args(req);
2565                         aa->oa_ei = einfo;
2566                         aa->oa_exp = exp;
2567                         aa->oa_flags  = flags;
2568                         aa->oa_upcall = upcall;
2569                         aa->oa_cookie = cookie;
2570                         aa->oa_lvb    = lvb;
2571                         aa->oa_lockh  = lockh;
2572                         aa->oa_agl    = !!agl;
2573
2574                         req->rq_interpret_reply =
2575                                 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2576                         if (rqset == PTLRPCD_SET)
2577                                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2578                         else
2579                                 ptlrpc_set_add_req(rqset, req);
2580                 } else if (intent) {
2581                         ptlrpc_req_finished(req);
2582                 }
2583                 return rc;
2584         }
2585
2586         rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2587         if (intent)
2588                 ptlrpc_req_finished(req);
2589
2590         return rc;
2591 }
2592
2593 static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2594                        struct ldlm_enqueue_info *einfo,
2595                        struct ptlrpc_request_set *rqset)
2596 {
2597         struct ldlm_res_id res_id;
2598         int rc;
2599
2600         ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2601         rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2602                               &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2603                               oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2604                               oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2605                               rqset, rqset != NULL, 0);
2606         return rc;
2607 }
2608
2609 int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2610                    __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2611                    __u64 *flags, void *data, struct lustre_handle *lockh,
2612                    int unref)
2613 {
2614         struct obd_device *obd = exp->exp_obd;
2615         __u64 lflags = *flags;
2616         ldlm_mode_t rc;
2617
2618         if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2619                 return -EIO;
2620
2621         /* Filesystem lock extents are extended to page boundaries so that
2622          * dealing with the page cache is a little smoother */
2623         policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2624         policy->l_extent.end |= ~CFS_PAGE_MASK;
2625
2626         /* Next, search for already existing extent locks that will cover us */
2627         /* If we're trying to read, we also search for an existing PW lock.  The
2628          * VFS and page cache already protect us locally, so lots of readers/
2629          * writers can share a single PW lock. */
2630         rc = mode;
2631         if (mode == LCK_PR)
2632                 rc |= LCK_PW;
2633         rc = ldlm_lock_match(obd->obd_namespace, lflags,
2634                              res_id, type, policy, rc, lockh, unref);
2635         if (rc) {
2636                 if (data != NULL) {
2637                         if (!osc_set_data_with_check(lockh, data)) {
2638                                 if (!(lflags & LDLM_FL_TEST_LOCK))
2639                                         ldlm_lock_decref(lockh, rc);
2640                                 return 0;
2641                         }
2642                 }
2643                 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2644                         ldlm_lock_addref(lockh, LCK_PR);
2645                         ldlm_lock_decref(lockh, LCK_PW);
2646                 }
2647                 return rc;
2648         }
2649         return rc;
2650 }
2651
2652 int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2653 {
2654         if (unlikely(mode == LCK_GROUP))
2655                 ldlm_lock_decref_and_cancel(lockh, mode);
2656         else
2657                 ldlm_lock_decref(lockh, mode);
2658
2659         return 0;
2660 }
2661
2662 static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2663                       __u32 mode, struct lustre_handle *lockh)
2664 {
2665         return osc_cancel_base(lockh, mode);
2666 }
2667
2668 static int osc_cancel_unused(struct obd_export *exp,
2669                              struct lov_stripe_md *lsm,
2670                              ldlm_cancel_flags_t flags,
2671                              void *opaque)
2672 {
2673         struct obd_device *obd = class_exp2obd(exp);
2674         struct ldlm_res_id res_id, *resp = NULL;
2675
2676         if (lsm != NULL) {
2677                 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2678                 resp = &res_id;
2679         }
2680
2681         return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2682 }
2683
2684 static int osc_statfs_interpret(const struct lu_env *env,
2685                                 struct ptlrpc_request *req,
2686                                 struct osc_async_args *aa, int rc)
2687 {
2688         struct obd_statfs *msfs;
2689
2690         if (rc == -EBADR)
2691                 /* The request has in fact never been sent
2692                  * due to issues at a higher level (LOV).
2693                  * Exit immediately since the caller is
2694                  * aware of the problem and takes care
2695                  * of the clean up */
2696                  return rc;
2697
2698         if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2699             (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2700                 GOTO(out, rc = 0);
2701
2702         if (rc != 0)
2703                 GOTO(out, rc);
2704
2705         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2706         if (msfs == NULL) {
2707                 GOTO(out, rc = -EPROTO);
2708         }
2709
2710         *aa->aa_oi->oi_osfs = *msfs;
2711 out:
2712         rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2713         return rc;
2714 }
2715
2716 static int osc_statfs_async(struct obd_export *exp,
2717                             struct obd_info *oinfo, __u64 max_age,
2718                             struct ptlrpc_request_set *rqset)
2719 {
2720         struct obd_device     *obd = class_exp2obd(exp);
2721         struct ptlrpc_request *req;
2722         struct osc_async_args *aa;
2723         int                 rc;
2724
2725         /* We could possibly pass max_age in the request (as an absolute
2726          * timestamp or a "seconds.usec ago") so the target can avoid doing
2727          * extra calls into the filesystem if that isn't necessary (e.g.
2728          * during mount that would help a bit).  Having relative timestamps
2729          * is not so great if request processing is slow, while absolute
2730          * timestamps are not ideal because they need time synchronization. */
2731         req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2732         if (req == NULL)
2733                 return -ENOMEM;
2734
2735         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2736         if (rc) {
2737                 ptlrpc_request_free(req);
2738                 return rc;
2739         }
2740         ptlrpc_request_set_replen(req);
2741         req->rq_request_portal = OST_CREATE_PORTAL;
2742         ptlrpc_at_set_req_timeout(req);
2743
2744         if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2745                 /* procfs requests not want stat in wait for avoid deadlock */
2746                 req->rq_no_resend = 1;
2747                 req->rq_no_delay = 1;
2748         }
2749
2750         req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2751         CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2752         aa = ptlrpc_req_async_args(req);
2753         aa->aa_oi = oinfo;
2754
2755         ptlrpc_set_add_req(rqset, req);
2756         return 0;
2757 }
2758
2759 static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2760                       struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2761 {
2762         struct obd_device     *obd = class_exp2obd(exp);
2763         struct obd_statfs     *msfs;
2764         struct ptlrpc_request *req;
2765         struct obd_import     *imp = NULL;
2766         int rc;
2767
2768         /*Since the request might also come from lprocfs, so we need
2769          *sync this with client_disconnect_export Bug15684*/
2770         down_read(&obd->u.cli.cl_sem);
2771         if (obd->u.cli.cl_import)
2772                 imp = class_import_get(obd->u.cli.cl_import);
2773         up_read(&obd->u.cli.cl_sem);
2774         if (!imp)
2775                 return -ENODEV;
2776
2777         /* We could possibly pass max_age in the request (as an absolute
2778          * timestamp or a "seconds.usec ago") so the target can avoid doing
2779          * extra calls into the filesystem if that isn't necessary (e.g.
2780          * during mount that would help a bit).  Having relative timestamps
2781          * is not so great if request processing is slow, while absolute
2782          * timestamps are not ideal because they need time synchronization. */
2783         req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2784
2785         class_import_put(imp);
2786
2787         if (req == NULL)
2788                 return -ENOMEM;
2789
2790         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2791         if (rc) {
2792                 ptlrpc_request_free(req);
2793                 return rc;
2794         }
2795         ptlrpc_request_set_replen(req);
2796         req->rq_request_portal = OST_CREATE_PORTAL;
2797         ptlrpc_at_set_req_timeout(req);
2798
2799         if (flags & OBD_STATFS_NODELAY) {
2800                 /* procfs requests not want stat in wait for avoid deadlock */
2801                 req->rq_no_resend = 1;
2802                 req->rq_no_delay = 1;
2803         }
2804
2805         rc = ptlrpc_queue_wait(req);
2806         if (rc)
2807                 GOTO(out, rc);
2808
2809         msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2810         if (msfs == NULL) {
2811                 GOTO(out, rc = -EPROTO);
2812         }
2813
2814         *osfs = *msfs;
2815
2816  out:
2817         ptlrpc_req_finished(req);
2818         return rc;
2819 }
2820
2821 /* Retrieve object striping information.
2822  *
2823  * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2824  * the maximum number of OST indices which will fit in the user buffer.
2825  * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2826  */
2827 static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2828 {
2829         /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2830         struct lov_user_md_v3 lum, *lumk;
2831         struct lov_user_ost_data_v1 *lmm_objects;
2832         int rc = 0, lum_size;
2833
2834         if (!lsm)
2835                 return -ENODATA;
2836
2837         /* we only need the header part from user space to get lmm_magic and
2838          * lmm_stripe_count, (the header part is common to v1 and v3) */
2839         lum_size = sizeof(struct lov_user_md_v1);
2840         if (copy_from_user(&lum, lump, lum_size))
2841                 return -EFAULT;
2842
2843         if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2844             (lum.lmm_magic != LOV_USER_MAGIC_V3))
2845                 return -EINVAL;
2846
2847         /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2848         LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2849         LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2850         LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2851
2852         /* we can use lov_mds_md_size() to compute lum_size
2853          * because lov_user_md_vX and lov_mds_md_vX have the same size */
2854         if (lum.lmm_stripe_count > 0) {
2855                 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2856                 OBD_ALLOC(lumk, lum_size);
2857                 if (!lumk)
2858                         return -ENOMEM;
2859
2860                 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2861                         lmm_objects =
2862                             &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2863                 else
2864                         lmm_objects = &(lumk->lmm_objects[0]);
2865                 lmm_objects->l_ost_oi = lsm->lsm_oi;
2866         } else {
2867                 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2868                 lumk = &lum;
2869         }
2870
2871         lumk->lmm_oi = lsm->lsm_oi;
2872         lumk->lmm_stripe_count = 1;
2873
2874         if (copy_to_user(lump, lumk, lum_size))
2875                 rc = -EFAULT;
2876
2877         if (lumk != &lum)
2878                 OBD_FREE(lumk, lum_size);
2879
2880         return rc;
2881 }
2882
2883
2884 static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2885                          void *karg, void *uarg)
2886 {
2887         struct obd_device *obd = exp->exp_obd;
2888         struct obd_ioctl_data *data = karg;
2889         int err = 0;
2890
2891         if (!try_module_get(THIS_MODULE)) {
2892                 CERROR("Can't get module. Is it alive?");
2893                 return -EINVAL;
2894         }
2895         switch (cmd) {
2896         case OBD_IOC_LOV_GET_CONFIG: {
2897                 char *buf;
2898                 struct lov_desc *desc;
2899                 struct obd_uuid uuid;
2900
2901                 buf = NULL;
2902                 len = 0;
2903                 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2904                         GOTO(out, err = -EINVAL);
2905
2906                 data = (struct obd_ioctl_data *)buf;
2907
2908                 if (sizeof(*desc) > data->ioc_inllen1) {
2909                         obd_ioctl_freedata(buf, len);
2910                         GOTO(out, err = -EINVAL);
2911                 }
2912
2913                 if (data->ioc_inllen2 < sizeof(uuid)) {
2914                         obd_ioctl_freedata(buf, len);
2915                         GOTO(out, err = -EINVAL);
2916                 }
2917
2918                 desc = (struct lov_desc *)data->ioc_inlbuf1;
2919                 desc->ld_tgt_count = 1;
2920                 desc->ld_active_tgt_count = 1;
2921                 desc->ld_default_stripe_count = 1;
2922                 desc->ld_default_stripe_size = 0;
2923                 desc->ld_default_stripe_offset = 0;
2924                 desc->ld_pattern = 0;
2925                 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2926
2927                 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2928
2929                 err = copy_to_user((void *)uarg, buf, len);
2930                 if (err)
2931                         err = -EFAULT;
2932                 obd_ioctl_freedata(buf, len);
2933                 GOTO(out, err);
2934         }
2935         case LL_IOC_LOV_SETSTRIPE:
2936                 err = obd_alloc_memmd(exp, karg);
2937                 if (err > 0)
2938                         err = 0;
2939                 GOTO(out, err);
2940         case LL_IOC_LOV_GETSTRIPE:
2941                 err = osc_getstripe(karg, uarg);
2942                 GOTO(out, err);
2943         case OBD_IOC_CLIENT_RECOVER:
2944                 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2945                                             data->ioc_inlbuf1, 0);
2946                 if (err > 0)
2947                         err = 0;
2948                 GOTO(out, err);
2949         case IOC_OSC_SET_ACTIVE:
2950                 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2951                                                data->ioc_offset);
2952                 GOTO(out, err);
2953         case OBD_IOC_POLL_QUOTACHECK:
2954                 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
2955                 GOTO(out, err);
2956         case OBD_IOC_PING_TARGET:
2957                 err = ptlrpc_obd_ping(obd);
2958                 GOTO(out, err);
2959         default:
2960                 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
2961                        cmd, current_comm());
2962                 GOTO(out, err = -ENOTTY);
2963         }
2964 out:
2965         module_put(THIS_MODULE);
2966         return err;
2967 }
2968
2969 static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
2970                         obd_count keylen, void *key, __u32 *vallen, void *val,
2971                         struct lov_stripe_md *lsm)
2972 {
2973         if (!vallen || !val)
2974                 return -EFAULT;
2975
2976         if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
2977                 __u32 *stripe = val;
2978                 *vallen = sizeof(*stripe);
2979                 *stripe = 0;
2980                 return 0;
2981         } else if (KEY_IS(KEY_LAST_ID)) {
2982                 struct ptlrpc_request *req;
2983                 obd_id          *reply;
2984                 char              *tmp;
2985                 int                 rc;
2986
2987                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2988                                            &RQF_OST_GET_INFO_LAST_ID);
2989                 if (req == NULL)
2990                         return -ENOMEM;
2991
2992                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
2993                                      RCL_CLIENT, keylen);
2994                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
2995                 if (rc) {
2996                         ptlrpc_request_free(req);
2997                         return rc;
2998                 }
2999
3000                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3001                 memcpy(tmp, key, keylen);
3002
3003                 req->rq_no_delay = req->rq_no_resend = 1;
3004                 ptlrpc_request_set_replen(req);
3005                 rc = ptlrpc_queue_wait(req);
3006                 if (rc)
3007                         GOTO(out, rc);
3008
3009                 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3010                 if (reply == NULL)
3011                         GOTO(out, rc = -EPROTO);
3012
3013                 *((obd_id *)val) = *reply;
3014         out:
3015                 ptlrpc_req_finished(req);
3016                 return rc;
3017         } else if (KEY_IS(KEY_FIEMAP)) {
3018                 struct ll_fiemap_info_key *fm_key =
3019                                 (struct ll_fiemap_info_key *)key;
3020                 struct ldlm_res_id       res_id;
3021                 ldlm_policy_data_t       policy;
3022                 struct lustre_handle     lockh;
3023                 ldlm_mode_t              mode = 0;
3024                 struct ptlrpc_request   *req;
3025                 struct ll_user_fiemap   *reply;
3026                 char                    *tmp;
3027                 int                      rc;
3028
3029                 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
3030                         goto skip_locking;
3031
3032                 policy.l_extent.start = fm_key->fiemap.fm_start &
3033                                                 CFS_PAGE_MASK;
3034
3035                 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
3036                     fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
3037                         policy.l_extent.end = OBD_OBJECT_EOF;
3038                 else
3039                         policy.l_extent.end = (fm_key->fiemap.fm_start +
3040                                 fm_key->fiemap.fm_length +
3041                                 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
3042
3043                 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
3044                 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
3045                                        LDLM_FL_BLOCK_GRANTED |
3046                                        LDLM_FL_LVB_READY,
3047                                        &res_id, LDLM_EXTENT, &policy,
3048                                        LCK_PR | LCK_PW, &lockh, 0);
3049                 if (mode) { /* lock is cached on client */
3050                         if (mode != LCK_PR) {
3051                                 ldlm_lock_addref(&lockh, LCK_PR);
3052                                 ldlm_lock_decref(&lockh, LCK_PW);
3053                         }
3054                 } else { /* no cached lock, needs acquire lock on server side */
3055                         fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
3056                         fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
3057                 }
3058
3059 skip_locking:
3060                 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3061                                            &RQF_OST_GET_INFO_FIEMAP);
3062                 if (req == NULL)
3063                         GOTO(drop_lock, rc = -ENOMEM);
3064
3065                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3066                                      RCL_CLIENT, keylen);
3067                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3068                                      RCL_CLIENT, *vallen);
3069                 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3070                                      RCL_SERVER, *vallen);
3071
3072                 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3073                 if (rc) {
3074                         ptlrpc_request_free(req);
3075                         GOTO(drop_lock, rc);
3076                 }
3077
3078                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3079                 memcpy(tmp, key, keylen);
3080                 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3081                 memcpy(tmp, val, *vallen);
3082
3083                 ptlrpc_request_set_replen(req);
3084                 rc = ptlrpc_queue_wait(req);
3085                 if (rc)
3086                         GOTO(fini_req, rc);
3087
3088                 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3089                 if (reply == NULL)
3090                         GOTO(fini_req, rc = -EPROTO);
3091
3092                 memcpy(val, reply, *vallen);
3093 fini_req:
3094                 ptlrpc_req_finished(req);
3095 drop_lock:
3096                 if (mode)
3097                         ldlm_lock_decref(&lockh, LCK_PR);
3098                 return rc;
3099         }
3100
3101         return -EINVAL;
3102 }
3103
3104 static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3105                               obd_count keylen, void *key, obd_count vallen,
3106                               void *val, struct ptlrpc_request_set *set)
3107 {
3108         struct ptlrpc_request *req;
3109         struct obd_device     *obd = exp->exp_obd;
3110         struct obd_import     *imp = class_exp2cliimp(exp);
3111         char              *tmp;
3112         int                 rc;
3113
3114         OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3115
3116         if (KEY_IS(KEY_CHECKSUM)) {
3117                 if (vallen != sizeof(int))
3118                         return -EINVAL;
3119                 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3120                 return 0;
3121         }
3122
3123         if (KEY_IS(KEY_SPTLRPC_CONF)) {
3124                 sptlrpc_conf_client_adapt(obd);
3125                 return 0;
3126         }
3127
3128         if (KEY_IS(KEY_FLUSH_CTX)) {
3129                 sptlrpc_import_flush_my_ctx(imp);
3130                 return 0;
3131         }
3132
3133         if (KEY_IS(KEY_CACHE_SET)) {
3134                 struct client_obd *cli = &obd->u.cli;
3135
3136                 LASSERT(cli->cl_cache == NULL); /* only once */
3137                 cli->cl_cache = (struct cl_client_cache *)val;
3138                 atomic_inc(&cli->cl_cache->ccc_users);
3139                 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3140
3141                 /* add this osc into entity list */
3142                 LASSERT(list_empty(&cli->cl_lru_osc));
3143                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3144                 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3145                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3146
3147                 return 0;
3148         }
3149
3150         if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3151                 struct client_obd *cli = &obd->u.cli;
3152                 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
3153                 int target = *(int *)val;
3154
3155                 nr = osc_lru_shrink(cli, min(nr, target));
3156                 *(int *)val -= nr;
3157                 return 0;
3158         }
3159
3160         if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3161                 return -EINVAL;
3162
3163         /* We pass all other commands directly to OST. Since nobody calls osc
3164            methods directly and everybody is supposed to go through LOV, we
3165            assume lov checked invalid values for us.
3166            The only recognised values so far are evict_by_nid and mds_conn.
3167            Even if something bad goes through, we'd get a -EINVAL from OST
3168            anyway. */
3169
3170         req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3171                                                 &RQF_OST_SET_GRANT_INFO :
3172                                                 &RQF_OBD_SET_INFO);
3173         if (req == NULL)
3174                 return -ENOMEM;
3175
3176         req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3177                              RCL_CLIENT, keylen);
3178         if (!KEY_IS(KEY_GRANT_SHRINK))
3179                 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3180                                      RCL_CLIENT, vallen);
3181         rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3182         if (rc) {
3183                 ptlrpc_request_free(req);
3184                 return rc;
3185         }
3186
3187         tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3188         memcpy(tmp, key, keylen);
3189         tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3190                                                         &RMF_OST_BODY :
3191                                                         &RMF_SETINFO_VAL);
3192         memcpy(tmp, val, vallen);
3193
3194         if (KEY_IS(KEY_GRANT_SHRINK)) {
3195                 struct osc_grant_args *aa;
3196                 struct obdo *oa;
3197
3198                 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3199                 aa = ptlrpc_req_async_args(req);
3200                 OBDO_ALLOC(oa);
3201                 if (!oa) {
3202                         ptlrpc_req_finished(req);
3203                         return -ENOMEM;
3204                 }
3205                 *oa = ((struct ost_body *)val)->oa;
3206                 aa->aa_oa = oa;
3207                 req->rq_interpret_reply = osc_shrink_grant_interpret;
3208         }
3209
3210         ptlrpc_request_set_replen(req);
3211         if (!KEY_IS(KEY_GRANT_SHRINK)) {
3212                 LASSERT(set != NULL);
3213                 ptlrpc_set_add_req(set, req);
3214                 ptlrpc_check_set(NULL, set);
3215         } else
3216                 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3217
3218         return 0;
3219 }
3220
3221
3222 static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3223                          struct obd_device *disk_obd, int *index)
3224 {
3225         /* this code is not supposed to be used with LOD/OSP
3226          * to be removed soon */
3227         LBUG();
3228         return 0;
3229 }
3230
3231 static int osc_llog_finish(struct obd_device *obd, int count)
3232 {
3233         struct llog_ctxt *ctxt;
3234
3235         ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3236         if (ctxt) {
3237                 llog_cat_close(NULL, ctxt->loc_handle);
3238                 llog_cleanup(NULL, ctxt);
3239         }
3240
3241         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3242         if (ctxt)
3243                 llog_cleanup(NULL, ctxt);
3244         return 0;
3245 }
3246
3247 static int osc_reconnect(const struct lu_env *env,
3248                          struct obd_export *exp, struct obd_device *obd,
3249                          struct obd_uuid *cluuid,
3250                          struct obd_connect_data *data,
3251                          void *localdata)
3252 {
3253         struct client_obd *cli = &obd->u.cli;
3254
3255         if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3256                 long lost_grant;
3257
3258                 client_obd_list_lock(&cli->cl_loi_list_lock);
3259                 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3260                                 2 * cli_brw_size(obd);
3261                 lost_grant = cli->cl_lost_grant;
3262                 cli->cl_lost_grant = 0;
3263                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3264
3265                 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3266                        " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3267                        data->ocd_version, data->ocd_grant, lost_grant);
3268         }
3269
3270         return 0;
3271 }
3272
3273 static int osc_disconnect(struct obd_export *exp)
3274 {
3275         struct obd_device *obd = class_exp2obd(exp);
3276         struct llog_ctxt  *ctxt;
3277         int rc;
3278
3279         ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3280         if (ctxt) {
3281                 if (obd->u.cli.cl_conn_count == 1) {
3282                         /* Flush any remaining cancel messages out to the
3283                          * target */
3284                         llog_sync(ctxt, exp, 0);
3285                 }
3286                 llog_ctxt_put(ctxt);
3287         } else {
3288                 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3289                        obd);
3290         }
3291
3292         rc = client_disconnect_export(exp);
3293         /**
3294          * Initially we put del_shrink_grant before disconnect_export, but it
3295          * causes the following problem if setup (connect) and cleanup
3296          * (disconnect) are tangled together.
3297          *      connect p1                   disconnect p2
3298          *   ptlrpc_connect_import
3299          *     ...............         class_manual_cleanup
3300          *                                   osc_disconnect
3301          *                                   del_shrink_grant
3302          *   ptlrpc_connect_interrupt
3303          *     init_grant_shrink
3304          *   add this client to shrink list
3305          *                                    cleanup_osc
3306          * Bang! pinger trigger the shrink.
3307          * So the osc should be disconnected from the shrink list, after we
3308          * are sure the import has been destroyed. BUG18662
3309          */
3310         if (obd->u.cli.cl_import == NULL)
3311                 osc_del_shrink_grant(&obd->u.cli);
3312         return rc;
3313 }
3314
3315 static int osc_import_event(struct obd_device *obd,
3316                             struct obd_import *imp,
3317                             enum obd_import_event event)
3318 {
3319         struct client_obd *cli;
3320         int rc = 0;
3321
3322         LASSERT(imp->imp_obd == obd);
3323
3324         switch (event) {
3325         case IMP_EVENT_DISCON: {
3326                 cli = &obd->u.cli;
3327                 client_obd_list_lock(&cli->cl_loi_list_lock);
3328                 cli->cl_avail_grant = 0;
3329                 cli->cl_lost_grant = 0;
3330                 client_obd_list_unlock(&cli->cl_loi_list_lock);
3331                 break;
3332         }
3333         case IMP_EVENT_INACTIVE: {
3334                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3335                 break;
3336         }
3337         case IMP_EVENT_INVALIDATE: {
3338                 struct ldlm_namespace *ns = obd->obd_namespace;
3339                 struct lu_env    *env;
3340                 int                 refcheck;
3341
3342                 env = cl_env_get(&refcheck);
3343                 if (!IS_ERR(env)) {
3344                         /* Reset grants */
3345                         cli = &obd->u.cli;
3346                         /* all pages go to failing rpcs due to the invalid
3347                          * import */
3348                         osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3349
3350                         ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3351                         cl_env_put(env, &refcheck);
3352                 } else
3353                         rc = PTR_ERR(env);
3354                 break;
3355         }
3356         case IMP_EVENT_ACTIVE: {
3357                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3358                 break;
3359         }
3360         case IMP_EVENT_OCD: {
3361                 struct obd_connect_data *ocd = &imp->imp_connect_data;
3362
3363                 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3364                         osc_init_grant(&obd->u.cli, ocd);
3365
3366                 /* See bug 7198 */
3367                 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3368                         imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3369
3370                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3371                 break;
3372         }
3373         case IMP_EVENT_DEACTIVATE: {
3374                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3375                 break;
3376         }
3377         case IMP_EVENT_ACTIVATE: {
3378                 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3379                 break;
3380         }
3381         default:
3382                 CERROR("Unknown import event %d\n", event);
3383                 LBUG();
3384         }
3385         return rc;
3386 }
3387
3388 /**
3389  * Determine whether the lock can be canceled before replaying the lock
3390  * during recovery, see bug16774 for detailed information.
3391  *
3392  * \retval zero the lock can't be canceled
3393  * \retval other ok to cancel
3394  */
3395 static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3396 {
3397         check_res_locked(lock->l_resource);
3398
3399         /*
3400          * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3401          *
3402          * XXX as a future improvement, we can also cancel unused write lock
3403          * if it doesn't have dirty data and active mmaps.
3404          */
3405         if (lock->l_resource->lr_type == LDLM_EXTENT &&
3406             (lock->l_granted_mode == LCK_PR ||
3407              lock->l_granted_mode == LCK_CR) &&
3408             (osc_dlm_lock_pageref(lock) == 0))
3409                 return 1;
3410
3411         return 0;
3412 }
3413
3414 static int brw_queue_work(const struct lu_env *env, void *data)
3415 {
3416         struct client_obd *cli = data;
3417
3418         CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3419
3420         osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3421         return 0;
3422 }
3423
3424 int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3425 {
3426         struct lprocfs_static_vars lvars = { 0 };
3427         struct client_obd         *cli = &obd->u.cli;
3428         void                   *handler;
3429         int                     rc;
3430
3431         rc = ptlrpcd_addref();
3432         if (rc)
3433                 return rc;
3434
3435         rc = client_obd_setup(obd, lcfg);
3436         if (rc)
3437                 GOTO(out_ptlrpcd, rc);
3438
3439         handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3440         if (IS_ERR(handler))
3441                 GOTO(out_client_setup, rc = PTR_ERR(handler));
3442         cli->cl_writeback_work = handler;
3443
3444         rc = osc_quota_setup(obd);
3445         if (rc)
3446                 GOTO(out_ptlrpcd_work, rc);
3447
3448         cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3449         lprocfs_osc_init_vars(&lvars);
3450         if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3451                 lproc_osc_attach_seqstat(obd);
3452                 sptlrpc_lprocfs_cliobd_attach(obd);
3453                 ptlrpc_lprocfs_register_obd(obd);
3454         }
3455
3456         /* We need to allocate a few requests more, because
3457          * brw_interpret tries to create new requests before freeing
3458          * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3459          * reserved, but I'm afraid that might be too much wasted RAM
3460          * in fact, so 2 is just my guess and still should work. */
3461         cli->cl_import->imp_rq_pool =
3462                 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3463                                     OST_MAXREQSIZE,
3464                                     ptlrpc_add_rqs_to_pool);
3465
3466         INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3467         ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3468         return rc;
3469
3470 out_ptlrpcd_work:
3471         ptlrpcd_destroy_work(handler);
3472 out_client_setup:
3473         client_obd_cleanup(obd);
3474 out_ptlrpcd:
3475         ptlrpcd_decref();
3476         return rc;
3477 }
3478
3479 static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3480 {
3481         int rc = 0;
3482
3483         switch (stage) {
3484         case OBD_CLEANUP_EARLY: {
3485                 struct obd_import *imp;
3486                 imp = obd->u.cli.cl_import;
3487                 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3488                 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3489                 ptlrpc_deactivate_import(imp);
3490                 spin_lock(&imp->imp_lock);
3491                 imp->imp_pingable = 0;
3492                 spin_unlock(&imp->imp_lock);
3493                 break;
3494         }
3495         case OBD_CLEANUP_EXPORTS: {
3496                 struct client_obd *cli = &obd->u.cli;
3497                 /* LU-464
3498                  * for echo client, export may be on zombie list, wait for
3499                  * zombie thread to cull it, because cli.cl_import will be
3500                  * cleared in client_disconnect_export():
3501                  *   class_export_destroy() -> obd_cleanup() ->
3502                  *   echo_device_free() -> echo_client_cleanup() ->
3503                  *   obd_disconnect() -> osc_disconnect() ->
3504                  *   client_disconnect_export()
3505                  */
3506                 obd_zombie_barrier();
3507                 if (cli->cl_writeback_work) {
3508                         ptlrpcd_destroy_work(cli->cl_writeback_work);
3509                         cli->cl_writeback_work = NULL;
3510                 }
3511                 obd_cleanup_client_import(obd);
3512                 ptlrpc_lprocfs_unregister_obd(obd);
3513                 lprocfs_obd_cleanup(obd);
3514                 rc = obd_llog_finish(obd, 0);
3515                 if (rc != 0)
3516                         CERROR("failed to cleanup llogging subsystems\n");
3517                 break;
3518                 }
3519         }
3520         return rc;
3521 }
3522
3523 int osc_cleanup(struct obd_device *obd)
3524 {
3525         struct client_obd *cli = &obd->u.cli;
3526         int rc;
3527
3528         /* lru cleanup */
3529         if (cli->cl_cache != NULL) {
3530                 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3531                 spin_lock(&cli->cl_cache->ccc_lru_lock);
3532                 list_del_init(&cli->cl_lru_osc);
3533                 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3534                 cli->cl_lru_left = NULL;
3535                 atomic_dec(&cli->cl_cache->ccc_users);
3536                 cli->cl_cache = NULL;
3537         }
3538
3539         /* free memory of osc quota cache */
3540         osc_quota_cleanup(obd);
3541
3542         rc = client_obd_cleanup(obd);
3543
3544         ptlrpcd_decref();
3545         return rc;
3546 }
3547
3548 int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3549 {
3550         struct lprocfs_static_vars lvars = { 0 };
3551         int rc = 0;
3552
3553         lprocfs_osc_init_vars(&lvars);
3554
3555         switch (lcfg->lcfg_command) {
3556         default:
3557                 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3558                                               lcfg, obd);
3559                 if (rc > 0)
3560                         rc = 0;
3561                 break;
3562         }
3563
3564         return(rc);
3565 }
3566
3567 static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3568 {
3569         return osc_process_config_base(obd, buf);
3570 }
3571
3572 struct obd_ops osc_obd_ops = {
3573         .o_owner                = THIS_MODULE,
3574         .o_setup                = osc_setup,
3575         .o_precleanup      = osc_precleanup,
3576         .o_cleanup            = osc_cleanup,
3577         .o_add_conn          = client_import_add_conn,
3578         .o_del_conn          = client_import_del_conn,
3579         .o_connect            = client_connect_import,
3580         .o_reconnect        = osc_reconnect,
3581         .o_disconnect      = osc_disconnect,
3582         .o_statfs              = osc_statfs,
3583         .o_statfs_async  = osc_statfs_async,
3584         .o_packmd              = osc_packmd,
3585         .o_unpackmd          = osc_unpackmd,
3586         .o_create              = osc_create,
3587         .o_destroy            = osc_destroy,
3588         .o_getattr            = osc_getattr,
3589         .o_getattr_async        = osc_getattr_async,
3590         .o_setattr            = osc_setattr,
3591         .o_setattr_async        = osc_setattr_async,
3592         .o_brw            = osc_brw,
3593         .o_punch                = osc_punch,
3594         .o_sync          = osc_sync,
3595         .o_enqueue            = osc_enqueue,
3596         .o_change_cbdata        = osc_change_cbdata,
3597         .o_find_cbdata    = osc_find_cbdata,
3598         .o_cancel              = osc_cancel,
3599         .o_cancel_unused        = osc_cancel_unused,
3600         .o_iocontrol        = osc_iocontrol,
3601         .o_get_info          = osc_get_info,
3602         .o_set_info_async       = osc_set_info_async,
3603         .o_import_event  = osc_import_event,
3604         .o_llog_init        = osc_llog_init,
3605         .o_llog_finish    = osc_llog_finish,
3606         .o_process_config       = osc_process_config,
3607         .o_quotactl          = osc_quotactl,
3608         .o_quotacheck      = osc_quotacheck,
3609 };
3610
3611 extern struct lu_kmem_descr osc_caches[];
3612 extern spinlock_t osc_ast_guard;
3613 extern struct lock_class_key osc_ast_guard_class;
3614
3615 int __init osc_init(void)
3616 {
3617         struct lprocfs_static_vars lvars = { 0 };
3618         int rc;
3619
3620         /* print an address of _any_ initialized kernel symbol from this
3621          * module, to allow debugging with gdb that doesn't support data
3622          * symbols from modules.*/
3623         CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3624
3625         rc = lu_kmem_init(osc_caches);
3626         if (rc)
3627                 return rc;
3628
3629         lprocfs_osc_init_vars(&lvars);
3630
3631         rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3632                                  LUSTRE_OSC_NAME, &osc_device_type);
3633         if (rc) {
3634                 lu_kmem_fini(osc_caches);
3635                 return rc;
3636         }
3637
3638         spin_lock_init(&osc_ast_guard);
3639         lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3640
3641         return rc;
3642 }
3643
3644 static void /*__exit*/ osc_exit(void)
3645 {
3646         class_unregister_type(LUSTRE_OSC_NAME);
3647         lu_kmem_fini(osc_caches);
3648 }
3649
3650 MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3651 MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3652 MODULE_LICENSE("GPL");
3653 MODULE_VERSION(LUSTRE_VERSION_STRING);
3654
3655 module_init(osc_init);
3656 module_exit(osc_exit);