Merge tag 'drm-intel-fixes-2013-07-11' of git://people.freedesktop.org/~danvet/drm...
[linux-2.6-block.git] / drivers / staging / lustre / lustre / osc / osc_request.c
CommitLineData
d7e09d03
PT
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2002, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 */
36
37#define DEBUG_SUBSYSTEM S_OSC
38
39#include <linux/libcfs/libcfs.h>
40
41
42#include <lustre_dlm.h>
43#include <lustre_net.h>
44#include <lustre/lustre_user.h>
45#include <obd_cksum.h>
46#include <obd_ost.h>
47#include <obd_lov.h>
48
49#ifdef __CYGWIN__
50# include <ctype.h>
51#endif
52
53#include <lustre_ha.h>
54#include <lprocfs_status.h>
55#include <lustre_log.h>
56#include <lustre_debug.h>
57#include <lustre_param.h>
58#include <lustre_fid.h>
59#include "osc_internal.h"
60#include "osc_cl_internal.h"
61
62static void osc_release_ppga(struct brw_page **ppga, obd_count count);
63static int brw_interpret(const struct lu_env *env,
64 struct ptlrpc_request *req, void *data, int rc);
65int osc_cleanup(struct obd_device *obd);
66
67/* Pack OSC object metadata for disk storage (LE byte order). */
68static int osc_packmd(struct obd_export *exp, struct lov_mds_md **lmmp,
69 struct lov_stripe_md *lsm)
70{
71 int lmm_size;
72 ENTRY;
73
74 lmm_size = sizeof(**lmmp);
75 if (lmmp == NULL)
76 RETURN(lmm_size);
77
78 if (*lmmp != NULL && lsm == NULL) {
79 OBD_FREE(*lmmp, lmm_size);
80 *lmmp = NULL;
81 RETURN(0);
82 } else if (unlikely(lsm != NULL && ostid_id(&lsm->lsm_oi) == 0)) {
83 RETURN(-EBADF);
84 }
85
86 if (*lmmp == NULL) {
87 OBD_ALLOC(*lmmp, lmm_size);
88 if (*lmmp == NULL)
89 RETURN(-ENOMEM);
90 }
91
92 if (lsm)
93 ostid_cpu_to_le(&lsm->lsm_oi, &(*lmmp)->lmm_oi);
94
95 RETURN(lmm_size);
96}
97
98/* Unpack OSC object metadata from disk storage (LE byte order). */
99static int osc_unpackmd(struct obd_export *exp, struct lov_stripe_md **lsmp,
100 struct lov_mds_md *lmm, int lmm_bytes)
101{
102 int lsm_size;
103 struct obd_import *imp = class_exp2cliimp(exp);
104 ENTRY;
105
106 if (lmm != NULL) {
107 if (lmm_bytes < sizeof(*lmm)) {
108 CERROR("%s: lov_mds_md too small: %d, need %d\n",
109 exp->exp_obd->obd_name, lmm_bytes,
110 (int)sizeof(*lmm));
111 RETURN(-EINVAL);
112 }
113 /* XXX LOV_MAGIC etc check? */
114
115 if (unlikely(ostid_id(&lmm->lmm_oi) == 0)) {
116 CERROR("%s: zero lmm_object_id: rc = %d\n",
117 exp->exp_obd->obd_name, -EINVAL);
118 RETURN(-EINVAL);
119 }
120 }
121
122 lsm_size = lov_stripe_md_size(1);
123 if (lsmp == NULL)
124 RETURN(lsm_size);
125
126 if (*lsmp != NULL && lmm == NULL) {
127 OBD_FREE((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
128 OBD_FREE(*lsmp, lsm_size);
129 *lsmp = NULL;
130 RETURN(0);
131 }
132
133 if (*lsmp == NULL) {
134 OBD_ALLOC(*lsmp, lsm_size);
135 if (unlikely(*lsmp == NULL))
136 RETURN(-ENOMEM);
137 OBD_ALLOC((*lsmp)->lsm_oinfo[0], sizeof(struct lov_oinfo));
138 if (unlikely((*lsmp)->lsm_oinfo[0] == NULL)) {
139 OBD_FREE(*lsmp, lsm_size);
140 RETURN(-ENOMEM);
141 }
142 loi_init((*lsmp)->lsm_oinfo[0]);
143 } else if (unlikely(ostid_id(&(*lsmp)->lsm_oi) == 0)) {
144 RETURN(-EBADF);
145 }
146
147 if (lmm != NULL)
148 /* XXX zero *lsmp? */
149 ostid_le_to_cpu(&lmm->lmm_oi, &(*lsmp)->lsm_oi);
150
151 if (imp != NULL &&
152 (imp->imp_connect_data.ocd_connect_flags & OBD_CONNECT_MAXBYTES))
153 (*lsmp)->lsm_maxbytes = imp->imp_connect_data.ocd_maxbytes;
154 else
155 (*lsmp)->lsm_maxbytes = LUSTRE_STRIPE_MAXBYTES;
156
157 RETURN(lsm_size);
158}
159
160static inline void osc_pack_capa(struct ptlrpc_request *req,
161 struct ost_body *body, void *capa)
162{
163 struct obd_capa *oc = (struct obd_capa *)capa;
164 struct lustre_capa *c;
165
166 if (!capa)
167 return;
168
169 c = req_capsule_client_get(&req->rq_pill, &RMF_CAPA1);
170 LASSERT(c);
171 capa_cpy(c, oc);
172 body->oa.o_valid |= OBD_MD_FLOSSCAPA;
173 DEBUG_CAPA(D_SEC, c, "pack");
174}
175
176static inline void osc_pack_req_body(struct ptlrpc_request *req,
177 struct obd_info *oinfo)
178{
179 struct ost_body *body;
180
181 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
182 LASSERT(body);
183
3b2f75fd 184 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
185 oinfo->oi_oa);
d7e09d03
PT
186 osc_pack_capa(req, body, oinfo->oi_capa);
187}
188
189static inline void osc_set_capa_size(struct ptlrpc_request *req,
190 const struct req_msg_field *field,
191 struct obd_capa *oc)
192{
193 if (oc == NULL)
194 req_capsule_set_size(&req->rq_pill, field, RCL_CLIENT, 0);
195 else
196 /* it is already calculated as sizeof struct obd_capa */
197 ;
198}
199
200static int osc_getattr_interpret(const struct lu_env *env,
201 struct ptlrpc_request *req,
202 struct osc_async_args *aa, int rc)
203{
204 struct ost_body *body;
205 ENTRY;
206
207 if (rc != 0)
208 GOTO(out, rc);
209
210 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
211 if (body) {
212 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
3b2f75fd 213 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
214 aa->aa_oi->oi_oa, &body->oa);
d7e09d03
PT
215
216 /* This should really be sent by the OST */
217 aa->aa_oi->oi_oa->o_blksize = DT_MAX_BRW_SIZE;
218 aa->aa_oi->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
219 } else {
220 CDEBUG(D_INFO, "can't unpack ost_body\n");
221 rc = -EPROTO;
222 aa->aa_oi->oi_oa->o_valid = 0;
223 }
224out:
225 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
226 RETURN(rc);
227}
228
229static int osc_getattr_async(struct obd_export *exp, struct obd_info *oinfo,
230 struct ptlrpc_request_set *set)
231{
232 struct ptlrpc_request *req;
233 struct osc_async_args *aa;
234 int rc;
235 ENTRY;
236
237 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
238 if (req == NULL)
239 RETURN(-ENOMEM);
240
241 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
242 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
243 if (rc) {
244 ptlrpc_request_free(req);
245 RETURN(rc);
246 }
247
248 osc_pack_req_body(req, oinfo);
249
250 ptlrpc_request_set_replen(req);
251 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_getattr_interpret;
252
253 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
254 aa = ptlrpc_req_async_args(req);
255 aa->aa_oi = oinfo;
256
257 ptlrpc_set_add_req(set, req);
258 RETURN(0);
259}
260
261static int osc_getattr(const struct lu_env *env, struct obd_export *exp,
262 struct obd_info *oinfo)
263{
264 struct ptlrpc_request *req;
265 struct ost_body *body;
266 int rc;
267 ENTRY;
268
269 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_GETATTR);
270 if (req == NULL)
271 RETURN(-ENOMEM);
272
273 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
274 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GETATTR);
275 if (rc) {
276 ptlrpc_request_free(req);
277 RETURN(rc);
278 }
279
280 osc_pack_req_body(req, oinfo);
281
282 ptlrpc_request_set_replen(req);
283
284 rc = ptlrpc_queue_wait(req);
285 if (rc)
286 GOTO(out, rc);
287
288 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
289 if (body == NULL)
290 GOTO(out, rc = -EPROTO);
291
292 CDEBUG(D_INODE, "mode: %o\n", body->oa.o_mode);
3b2f75fd 293 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
294 &body->oa);
d7e09d03
PT
295
296 oinfo->oi_oa->o_blksize = cli_brw_size(exp->exp_obd);
297 oinfo->oi_oa->o_valid |= OBD_MD_FLBLKSZ;
298
299 EXIT;
300 out:
301 ptlrpc_req_finished(req);
302 return rc;
303}
304
305static int osc_setattr(const struct lu_env *env, struct obd_export *exp,
306 struct obd_info *oinfo, struct obd_trans_info *oti)
307{
308 struct ptlrpc_request *req;
309 struct ost_body *body;
310 int rc;
311 ENTRY;
312
313 LASSERT(oinfo->oi_oa->o_valid & OBD_MD_FLGROUP);
314
315 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
316 if (req == NULL)
317 RETURN(-ENOMEM);
318
319 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
320 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
321 if (rc) {
322 ptlrpc_request_free(req);
323 RETURN(rc);
324 }
325
326 osc_pack_req_body(req, oinfo);
327
328 ptlrpc_request_set_replen(req);
329
330 rc = ptlrpc_queue_wait(req);
331 if (rc)
332 GOTO(out, rc);
333
334 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
335 if (body == NULL)
336 GOTO(out, rc = -EPROTO);
337
3b2f75fd 338 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oinfo->oi_oa,
339 &body->oa);
d7e09d03
PT
340
341 EXIT;
342out:
343 ptlrpc_req_finished(req);
344 RETURN(rc);
345}
346
347static int osc_setattr_interpret(const struct lu_env *env,
348 struct ptlrpc_request *req,
349 struct osc_setattr_args *sa, int rc)
350{
351 struct ost_body *body;
352 ENTRY;
353
354 if (rc != 0)
355 GOTO(out, rc);
356
357 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
358 if (body == NULL)
359 GOTO(out, rc = -EPROTO);
360
3b2f75fd 361 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, sa->sa_oa,
362 &body->oa);
d7e09d03
PT
363out:
364 rc = sa->sa_upcall(sa->sa_cookie, rc);
365 RETURN(rc);
366}
367
368int osc_setattr_async_base(struct obd_export *exp, struct obd_info *oinfo,
369 struct obd_trans_info *oti,
370 obd_enqueue_update_f upcall, void *cookie,
371 struct ptlrpc_request_set *rqset)
372{
373 struct ptlrpc_request *req;
374 struct osc_setattr_args *sa;
375 int rc;
376 ENTRY;
377
378 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SETATTR);
379 if (req == NULL)
380 RETURN(-ENOMEM);
381
382 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
383 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SETATTR);
384 if (rc) {
385 ptlrpc_request_free(req);
386 RETURN(rc);
387 }
388
389 if (oti && oinfo->oi_oa->o_valid & OBD_MD_FLCOOKIE)
390 oinfo->oi_oa->o_lcookie = *oti->oti_logcookies;
391
392 osc_pack_req_body(req, oinfo);
393
394 ptlrpc_request_set_replen(req);
395
396 /* do mds to ost setattr asynchronously */
397 if (!rqset) {
398 /* Do not wait for response. */
399 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
400 } else {
401 req->rq_interpret_reply =
402 (ptlrpc_interpterer_t)osc_setattr_interpret;
403
404 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
405 sa = ptlrpc_req_async_args(req);
406 sa->sa_oa = oinfo->oi_oa;
407 sa->sa_upcall = upcall;
408 sa->sa_cookie = cookie;
409
410 if (rqset == PTLRPCD_SET)
411 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
412 else
413 ptlrpc_set_add_req(rqset, req);
414 }
415
416 RETURN(0);
417}
418
419static int osc_setattr_async(struct obd_export *exp, struct obd_info *oinfo,
420 struct obd_trans_info *oti,
421 struct ptlrpc_request_set *rqset)
422{
423 return osc_setattr_async_base(exp, oinfo, oti,
424 oinfo->oi_cb_up, oinfo, rqset);
425}
426
427int osc_real_create(struct obd_export *exp, struct obdo *oa,
428 struct lov_stripe_md **ea, struct obd_trans_info *oti)
429{
430 struct ptlrpc_request *req;
431 struct ost_body *body;
432 struct lov_stripe_md *lsm;
433 int rc;
434 ENTRY;
435
436 LASSERT(oa);
437 LASSERT(ea);
438
439 lsm = *ea;
440 if (!lsm) {
441 rc = obd_alloc_memmd(exp, &lsm);
442 if (rc < 0)
443 RETURN(rc);
444 }
445
446 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_CREATE);
447 if (req == NULL)
448 GOTO(out, rc = -ENOMEM);
449
450 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_CREATE);
451 if (rc) {
452 ptlrpc_request_free(req);
453 GOTO(out, rc);
454 }
455
456 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
457 LASSERT(body);
3b2f75fd 458
459 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
d7e09d03
PT
460
461 ptlrpc_request_set_replen(req);
462
463 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
464 oa->o_flags == OBD_FL_DELORPHAN) {
465 DEBUG_REQ(D_HA, req,
466 "delorphan from OST integration");
467 /* Don't resend the delorphan req */
468 req->rq_no_resend = req->rq_no_delay = 1;
469 }
470
471 rc = ptlrpc_queue_wait(req);
472 if (rc)
473 GOTO(out_req, rc);
474
475 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
476 if (body == NULL)
477 GOTO(out_req, rc = -EPROTO);
478
3b2f75fd 479 CDEBUG(D_INFO, "oa flags %x\n", oa->o_flags);
480 lustre_get_wire_obdo(&req->rq_import->imp_connect_data, oa, &body->oa);
d7e09d03
PT
481
482 oa->o_blksize = cli_brw_size(exp->exp_obd);
483 oa->o_valid |= OBD_MD_FLBLKSZ;
484
485 /* XXX LOV STACKING: the lsm that is passed to us from LOV does not
486 * have valid lsm_oinfo data structs, so don't go touching that.
487 * This needs to be fixed in a big way.
488 */
489 lsm->lsm_oi = oa->o_oi;
490 *ea = lsm;
491
492 if (oti != NULL) {
493 oti->oti_transno = lustre_msg_get_transno(req->rq_repmsg);
494
495 if (oa->o_valid & OBD_MD_FLCOOKIE) {
496 if (!oti->oti_logcookies)
497 oti_alloc_cookies(oti, 1);
498 *oti->oti_logcookies = oa->o_lcookie;
499 }
500 }
501
502 CDEBUG(D_HA, "transno: "LPD64"\n",
503 lustre_msg_get_transno(req->rq_repmsg));
504out_req:
505 ptlrpc_req_finished(req);
506out:
507 if (rc && !*ea)
508 obd_free_memmd(exp, &lsm);
509 RETURN(rc);
510}
511
512int osc_punch_base(struct obd_export *exp, struct obd_info *oinfo,
513 obd_enqueue_update_f upcall, void *cookie,
514 struct ptlrpc_request_set *rqset)
515{
516 struct ptlrpc_request *req;
517 struct osc_setattr_args *sa;
518 struct ost_body *body;
519 int rc;
520 ENTRY;
521
522 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_PUNCH);
523 if (req == NULL)
524 RETURN(-ENOMEM);
525
526 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
527 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_PUNCH);
528 if (rc) {
529 ptlrpc_request_free(req);
530 RETURN(rc);
531 }
532 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
533 ptlrpc_at_set_req_timeout(req);
534
535 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
536 LASSERT(body);
3b2f75fd 537 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
538 oinfo->oi_oa);
d7e09d03
PT
539 osc_pack_capa(req, body, oinfo->oi_capa);
540
541 ptlrpc_request_set_replen(req);
542
543 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_setattr_interpret;
544 CLASSERT (sizeof(*sa) <= sizeof(req->rq_async_args));
545 sa = ptlrpc_req_async_args(req);
546 sa->sa_oa = oinfo->oi_oa;
547 sa->sa_upcall = upcall;
548 sa->sa_cookie = cookie;
549 if (rqset == PTLRPCD_SET)
550 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
551 else
552 ptlrpc_set_add_req(rqset, req);
553
554 RETURN(0);
555}
556
557static int osc_punch(const struct lu_env *env, struct obd_export *exp,
558 struct obd_info *oinfo, struct obd_trans_info *oti,
559 struct ptlrpc_request_set *rqset)
560{
561 oinfo->oi_oa->o_size = oinfo->oi_policy.l_extent.start;
562 oinfo->oi_oa->o_blocks = oinfo->oi_policy.l_extent.end;
563 oinfo->oi_oa->o_valid |= OBD_MD_FLSIZE | OBD_MD_FLBLOCKS;
564 return osc_punch_base(exp, oinfo,
565 oinfo->oi_cb_up, oinfo, rqset);
566}
567
568static int osc_sync_interpret(const struct lu_env *env,
569 struct ptlrpc_request *req,
570 void *arg, int rc)
571{
572 struct osc_fsync_args *fa = arg;
573 struct ost_body *body;
574 ENTRY;
575
576 if (rc)
577 GOTO(out, rc);
578
579 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
580 if (body == NULL) {
581 CERROR ("can't unpack ost_body\n");
582 GOTO(out, rc = -EPROTO);
583 }
584
585 *fa->fa_oi->oi_oa = body->oa;
586out:
587 rc = fa->fa_upcall(fa->fa_cookie, rc);
588 RETURN(rc);
589}
590
591int osc_sync_base(struct obd_export *exp, struct obd_info *oinfo,
592 obd_enqueue_update_f upcall, void *cookie,
593 struct ptlrpc_request_set *rqset)
594{
595 struct ptlrpc_request *req;
596 struct ost_body *body;
597 struct osc_fsync_args *fa;
598 int rc;
599 ENTRY;
600
601 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_SYNC);
602 if (req == NULL)
603 RETURN(-ENOMEM);
604
605 osc_set_capa_size(req, &RMF_CAPA1, oinfo->oi_capa);
606 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SYNC);
607 if (rc) {
608 ptlrpc_request_free(req);
609 RETURN(rc);
610 }
611
612 /* overload the size and blocks fields in the oa with start/end */
613 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
614 LASSERT(body);
3b2f75fd 615 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa,
616 oinfo->oi_oa);
d7e09d03
PT
617 osc_pack_capa(req, body, oinfo->oi_capa);
618
619 ptlrpc_request_set_replen(req);
620 req->rq_interpret_reply = osc_sync_interpret;
621
622 CLASSERT(sizeof(*fa) <= sizeof(req->rq_async_args));
623 fa = ptlrpc_req_async_args(req);
624 fa->fa_oi = oinfo;
625 fa->fa_upcall = upcall;
626 fa->fa_cookie = cookie;
627
628 if (rqset == PTLRPCD_SET)
629 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
630 else
631 ptlrpc_set_add_req(rqset, req);
632
633 RETURN (0);
634}
635
636static int osc_sync(const struct lu_env *env, struct obd_export *exp,
637 struct obd_info *oinfo, obd_size start, obd_size end,
638 struct ptlrpc_request_set *set)
639{
640 ENTRY;
641
642 if (!oinfo->oi_oa) {
643 CDEBUG(D_INFO, "oa NULL\n");
644 RETURN(-EINVAL);
645 }
646
647 oinfo->oi_oa->o_size = start;
648 oinfo->oi_oa->o_blocks = end;
649 oinfo->oi_oa->o_valid |= (OBD_MD_FLSIZE | OBD_MD_FLBLOCKS);
650
651 RETURN(osc_sync_base(exp, oinfo, oinfo->oi_cb_up, oinfo, set));
652}
653
654/* Find and cancel locally locks matched by @mode in the resource found by
655 * @objid. Found locks are added into @cancel list. Returns the amount of
656 * locks added to @cancels list. */
657static int osc_resource_get_unused(struct obd_export *exp, struct obdo *oa,
658 struct list_head *cancels,
659 ldlm_mode_t mode, int lock_flags)
660{
661 struct ldlm_namespace *ns = exp->exp_obd->obd_namespace;
662 struct ldlm_res_id res_id;
663 struct ldlm_resource *res;
664 int count;
665 ENTRY;
666
667 /* Return, i.e. cancel nothing, only if ELC is supported (flag in
668 * export) but disabled through procfs (flag in NS).
669 *
670 * This distinguishes from a case when ELC is not supported originally,
671 * when we still want to cancel locks in advance and just cancel them
672 * locally, without sending any RPC. */
673 if (exp_connect_cancelset(exp) && !ns_connect_cancelset(ns))
674 RETURN(0);
675
676 ostid_build_res_name(&oa->o_oi, &res_id);
677 res = ldlm_resource_get(ns, NULL, &res_id, 0, 0);
678 if (res == NULL)
679 RETURN(0);
680
681 LDLM_RESOURCE_ADDREF(res);
682 count = ldlm_cancel_resource_local(res, cancels, NULL, mode,
683 lock_flags, 0, NULL);
684 LDLM_RESOURCE_DELREF(res);
685 ldlm_resource_putref(res);
686 RETURN(count);
687}
688
689static int osc_destroy_interpret(const struct lu_env *env,
690 struct ptlrpc_request *req, void *data,
691 int rc)
692{
693 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
694
695 atomic_dec(&cli->cl_destroy_in_flight);
696 wake_up(&cli->cl_destroy_waitq);
697 return 0;
698}
699
700static int osc_can_send_destroy(struct client_obd *cli)
701{
702 if (atomic_inc_return(&cli->cl_destroy_in_flight) <=
703 cli->cl_max_rpcs_in_flight) {
704 /* The destroy request can be sent */
705 return 1;
706 }
707 if (atomic_dec_return(&cli->cl_destroy_in_flight) <
708 cli->cl_max_rpcs_in_flight) {
709 /*
710 * The counter has been modified between the two atomic
711 * operations.
712 */
713 wake_up(&cli->cl_destroy_waitq);
714 }
715 return 0;
716}
717
718int osc_create(const struct lu_env *env, struct obd_export *exp,
719 struct obdo *oa, struct lov_stripe_md **ea,
720 struct obd_trans_info *oti)
721{
722 int rc = 0;
723 ENTRY;
724
725 LASSERT(oa);
726 LASSERT(ea);
727 LASSERT(oa->o_valid & OBD_MD_FLGROUP);
728
729 if ((oa->o_valid & OBD_MD_FLFLAGS) &&
730 oa->o_flags == OBD_FL_RECREATE_OBJS) {
731 RETURN(osc_real_create(exp, oa, ea, oti));
732 }
733
734 if (!fid_seq_is_mdt(ostid_seq(&oa->o_oi)))
735 RETURN(osc_real_create(exp, oa, ea, oti));
736
737 /* we should not get here anymore */
738 LBUG();
739
740 RETURN(rc);
741}
742
743/* Destroy requests can be async always on the client, and we don't even really
744 * care about the return code since the client cannot do anything at all about
745 * a destroy failure.
746 * When the MDS is unlinking a filename, it saves the file objects into a
747 * recovery llog, and these object records are cancelled when the OST reports
748 * they were destroyed and sync'd to disk (i.e. transaction committed).
749 * If the client dies, or the OST is down when the object should be destroyed,
750 * the records are not cancelled, and when the OST reconnects to the MDS next,
751 * it will retrieve the llog unlink logs and then sends the log cancellation
752 * cookies to the MDS after committing destroy transactions. */
753static int osc_destroy(const struct lu_env *env, struct obd_export *exp,
754 struct obdo *oa, struct lov_stripe_md *ea,
755 struct obd_trans_info *oti, struct obd_export *md_export,
756 void *capa)
757{
758 struct client_obd *cli = &exp->exp_obd->u.cli;
759 struct ptlrpc_request *req;
760 struct ost_body *body;
761 LIST_HEAD(cancels);
762 int rc, count;
763 ENTRY;
764
765 if (!oa) {
766 CDEBUG(D_INFO, "oa NULL\n");
767 RETURN(-EINVAL);
768 }
769
770 count = osc_resource_get_unused(exp, oa, &cancels, LCK_PW,
771 LDLM_FL_DISCARD_DATA);
772
773 req = ptlrpc_request_alloc(class_exp2cliimp(exp), &RQF_OST_DESTROY);
774 if (req == NULL) {
775 ldlm_lock_list_put(&cancels, l_bl_ast, count);
776 RETURN(-ENOMEM);
777 }
778
779 osc_set_capa_size(req, &RMF_CAPA1, (struct obd_capa *)capa);
780 rc = ldlm_prep_elc_req(exp, req, LUSTRE_OST_VERSION, OST_DESTROY,
781 0, &cancels, count);
782 if (rc) {
783 ptlrpc_request_free(req);
784 RETURN(rc);
785 }
786
787 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
788 ptlrpc_at_set_req_timeout(req);
789
790 if (oti != NULL && oa->o_valid & OBD_MD_FLCOOKIE)
791 oa->o_lcookie = *oti->oti_logcookies;
792 body = req_capsule_client_get(&req->rq_pill, &RMF_OST_BODY);
793 LASSERT(body);
3b2f75fd 794 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
d7e09d03
PT
795
796 osc_pack_capa(req, body, (struct obd_capa *)capa);
797 ptlrpc_request_set_replen(req);
798
799 /* If osc_destory is for destroying the unlink orphan,
800 * sent from MDT to OST, which should not be blocked here,
801 * because the process might be triggered by ptlrpcd, and
802 * it is not good to block ptlrpcd thread (b=16006)*/
803 if (!(oa->o_flags & OBD_FL_DELORPHAN)) {
804 req->rq_interpret_reply = osc_destroy_interpret;
805 if (!osc_can_send_destroy(cli)) {
806 struct l_wait_info lwi = LWI_INTR(LWI_ON_SIGNAL_NOOP,
807 NULL);
808
809 /*
810 * Wait until the number of on-going destroy RPCs drops
811 * under max_rpc_in_flight
812 */
813 l_wait_event_exclusive(cli->cl_destroy_waitq,
814 osc_can_send_destroy(cli), &lwi);
815 }
816 }
817
818 /* Do not wait for response */
819 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
820 RETURN(0);
821}
822
823static void osc_announce_cached(struct client_obd *cli, struct obdo *oa,
824 long writing_bytes)
825{
826 obd_flag bits = OBD_MD_FLBLOCKS|OBD_MD_FLGRANT;
827
828 LASSERT(!(oa->o_valid & bits));
829
830 oa->o_valid |= bits;
831 client_obd_list_lock(&cli->cl_loi_list_lock);
832 oa->o_dirty = cli->cl_dirty;
833 if (unlikely(cli->cl_dirty - cli->cl_dirty_transit >
834 cli->cl_dirty_max)) {
835 CERROR("dirty %lu - %lu > dirty_max %lu\n",
836 cli->cl_dirty, cli->cl_dirty_transit, cli->cl_dirty_max);
837 oa->o_undirty = 0;
c52f69c5 838 } else if (unlikely(atomic_read(&obd_dirty_pages) -
d7e09d03
PT
839 atomic_read(&obd_dirty_transit_pages) >
840 (long)(obd_max_dirty_pages + 1))) {
841 /* The atomic_read() allowing the atomic_inc() are
842 * not covered by a lock thus they may safely race and trip
843 * this CERROR() unless we add in a small fudge factor (+1). */
c52f69c5 844 CERROR("dirty %d - %d > system dirty_max %d\n",
d7e09d03
PT
845 atomic_read(&obd_dirty_pages),
846 atomic_read(&obd_dirty_transit_pages),
847 obd_max_dirty_pages);
848 oa->o_undirty = 0;
849 } else if (unlikely(cli->cl_dirty_max - cli->cl_dirty > 0x7fffffff)) {
850 CERROR("dirty %lu - dirty_max %lu too big???\n",
851 cli->cl_dirty, cli->cl_dirty_max);
852 oa->o_undirty = 0;
853 } else {
854 long max_in_flight = (cli->cl_max_pages_per_rpc <<
855 PAGE_CACHE_SHIFT)*
856 (cli->cl_max_rpcs_in_flight + 1);
857 oa->o_undirty = max(cli->cl_dirty_max, max_in_flight);
858 }
859 oa->o_grant = cli->cl_avail_grant + cli->cl_reserved_grant;
860 oa->o_dropped = cli->cl_lost_grant;
861 cli->cl_lost_grant = 0;
862 client_obd_list_unlock(&cli->cl_loi_list_lock);
863 CDEBUG(D_CACHE,"dirty: "LPU64" undirty: %u dropped %u grant: "LPU64"\n",
864 oa->o_dirty, oa->o_undirty, oa->o_dropped, oa->o_grant);
865
866}
867
868void osc_update_next_shrink(struct client_obd *cli)
869{
870 cli->cl_next_shrink_grant =
871 cfs_time_shift(cli->cl_grant_shrink_interval);
872 CDEBUG(D_CACHE, "next time %ld to shrink grant \n",
873 cli->cl_next_shrink_grant);
874}
875
876static void __osc_update_grant(struct client_obd *cli, obd_size grant)
877{
878 client_obd_list_lock(&cli->cl_loi_list_lock);
879 cli->cl_avail_grant += grant;
880 client_obd_list_unlock(&cli->cl_loi_list_lock);
881}
882
883static void osc_update_grant(struct client_obd *cli, struct ost_body *body)
884{
885 if (body->oa.o_valid & OBD_MD_FLGRANT) {
886 CDEBUG(D_CACHE, "got "LPU64" extra grant\n", body->oa.o_grant);
887 __osc_update_grant(cli, body->oa.o_grant);
888 }
889}
890
891static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
892 obd_count keylen, void *key, obd_count vallen,
893 void *val, struct ptlrpc_request_set *set);
894
895static int osc_shrink_grant_interpret(const struct lu_env *env,
896 struct ptlrpc_request *req,
897 void *aa, int rc)
898{
899 struct client_obd *cli = &req->rq_import->imp_obd->u.cli;
900 struct obdo *oa = ((struct osc_grant_args *)aa)->aa_oa;
901 struct ost_body *body;
902
903 if (rc != 0) {
904 __osc_update_grant(cli, oa->o_grant);
905 GOTO(out, rc);
906 }
907
908 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
909 LASSERT(body);
910 osc_update_grant(cli, body);
911out:
912 OBDO_FREE(oa);
913 return rc;
914}
915
916static void osc_shrink_grant_local(struct client_obd *cli, struct obdo *oa)
917{
918 client_obd_list_lock(&cli->cl_loi_list_lock);
919 oa->o_grant = cli->cl_avail_grant / 4;
920 cli->cl_avail_grant -= oa->o_grant;
921 client_obd_list_unlock(&cli->cl_loi_list_lock);
922 if (!(oa->o_valid & OBD_MD_FLFLAGS)) {
923 oa->o_valid |= OBD_MD_FLFLAGS;
924 oa->o_flags = 0;
925 }
926 oa->o_flags |= OBD_FL_SHRINK_GRANT;
927 osc_update_next_shrink(cli);
928}
929
930/* Shrink the current grant, either from some large amount to enough for a
931 * full set of in-flight RPCs, or if we have already shrunk to that limit
932 * then to enough for a single RPC. This avoids keeping more grant than
933 * needed, and avoids shrinking the grant piecemeal. */
934static int osc_shrink_grant(struct client_obd *cli)
935{
936 __u64 target_bytes = (cli->cl_max_rpcs_in_flight + 1) *
937 (cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT);
938
939 client_obd_list_lock(&cli->cl_loi_list_lock);
940 if (cli->cl_avail_grant <= target_bytes)
941 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
942 client_obd_list_unlock(&cli->cl_loi_list_lock);
943
944 return osc_shrink_grant_to_target(cli, target_bytes);
945}
946
947int osc_shrink_grant_to_target(struct client_obd *cli, __u64 target_bytes)
948{
949 int rc = 0;
950 struct ost_body *body;
951 ENTRY;
952
953 client_obd_list_lock(&cli->cl_loi_list_lock);
954 /* Don't shrink if we are already above or below the desired limit
955 * We don't want to shrink below a single RPC, as that will negatively
956 * impact block allocation and long-term performance. */
957 if (target_bytes < cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT)
958 target_bytes = cli->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
959
960 if (target_bytes >= cli->cl_avail_grant) {
961 client_obd_list_unlock(&cli->cl_loi_list_lock);
962 RETURN(0);
963 }
964 client_obd_list_unlock(&cli->cl_loi_list_lock);
965
966 OBD_ALLOC_PTR(body);
967 if (!body)
968 RETURN(-ENOMEM);
969
970 osc_announce_cached(cli, &body->oa, 0);
971
972 client_obd_list_lock(&cli->cl_loi_list_lock);
973 body->oa.o_grant = cli->cl_avail_grant - target_bytes;
974 cli->cl_avail_grant = target_bytes;
975 client_obd_list_unlock(&cli->cl_loi_list_lock);
976 if (!(body->oa.o_valid & OBD_MD_FLFLAGS)) {
977 body->oa.o_valid |= OBD_MD_FLFLAGS;
978 body->oa.o_flags = 0;
979 }
980 body->oa.o_flags |= OBD_FL_SHRINK_GRANT;
981 osc_update_next_shrink(cli);
982
983 rc = osc_set_info_async(NULL, cli->cl_import->imp_obd->obd_self_export,
984 sizeof(KEY_GRANT_SHRINK), KEY_GRANT_SHRINK,
985 sizeof(*body), body, NULL);
986 if (rc != 0)
987 __osc_update_grant(cli, body->oa.o_grant);
988 OBD_FREE_PTR(body);
989 RETURN(rc);
990}
991
992static int osc_should_shrink_grant(struct client_obd *client)
993{
994 cfs_time_t time = cfs_time_current();
995 cfs_time_t next_shrink = client->cl_next_shrink_grant;
996
997 if ((client->cl_import->imp_connect_data.ocd_connect_flags &
998 OBD_CONNECT_GRANT_SHRINK) == 0)
999 return 0;
1000
1001 if (cfs_time_aftereq(time, next_shrink - 5 * CFS_TICK)) {
1002 /* Get the current RPC size directly, instead of going via:
1003 * cli_brw_size(obd->u.cli.cl_import->imp_obd->obd_self_export)
1004 * Keep comment here so that it can be found by searching. */
1005 int brw_size = client->cl_max_pages_per_rpc << PAGE_CACHE_SHIFT;
1006
1007 if (client->cl_import->imp_state == LUSTRE_IMP_FULL &&
1008 client->cl_avail_grant > brw_size)
1009 return 1;
1010 else
1011 osc_update_next_shrink(client);
1012 }
1013 return 0;
1014}
1015
1016static int osc_grant_shrink_grant_cb(struct timeout_item *item, void *data)
1017{
1018 struct client_obd *client;
1019
1020 list_for_each_entry(client, &item->ti_obd_list,
1021 cl_grant_shrink_list) {
1022 if (osc_should_shrink_grant(client))
1023 osc_shrink_grant(client);
1024 }
1025 return 0;
1026}
1027
1028static int osc_add_shrink_grant(struct client_obd *client)
1029{
1030 int rc;
1031
1032 rc = ptlrpc_add_timeout_client(client->cl_grant_shrink_interval,
1033 TIMEOUT_GRANT,
1034 osc_grant_shrink_grant_cb, NULL,
1035 &client->cl_grant_shrink_list);
1036 if (rc) {
1037 CERROR("add grant client %s error %d\n",
1038 client->cl_import->imp_obd->obd_name, rc);
1039 return rc;
1040 }
1041 CDEBUG(D_CACHE, "add grant client %s \n",
1042 client->cl_import->imp_obd->obd_name);
1043 osc_update_next_shrink(client);
1044 return 0;
1045}
1046
1047static int osc_del_shrink_grant(struct client_obd *client)
1048{
1049 return ptlrpc_del_timeout_client(&client->cl_grant_shrink_list,
1050 TIMEOUT_GRANT);
1051}
1052
1053static void osc_init_grant(struct client_obd *cli, struct obd_connect_data *ocd)
1054{
1055 /*
1056 * ocd_grant is the total grant amount we're expect to hold: if we've
1057 * been evicted, it's the new avail_grant amount, cl_dirty will drop
1058 * to 0 as inflight RPCs fail out; otherwise, it's avail_grant + dirty.
1059 *
1060 * race is tolerable here: if we're evicted, but imp_state already
1061 * left EVICTED state, then cl_dirty must be 0 already.
1062 */
1063 client_obd_list_lock(&cli->cl_loi_list_lock);
1064 if (cli->cl_import->imp_state == LUSTRE_IMP_EVICTED)
1065 cli->cl_avail_grant = ocd->ocd_grant;
1066 else
1067 cli->cl_avail_grant = ocd->ocd_grant - cli->cl_dirty;
1068
1069 if (cli->cl_avail_grant < 0) {
1070 CWARN("%s: available grant < 0: avail/ocd/dirty %ld/%u/%ld\n",
1071 cli->cl_import->imp_obd->obd_name, cli->cl_avail_grant,
1072 ocd->ocd_grant, cli->cl_dirty);
1073 /* workaround for servers which do not have the patch from
1074 * LU-2679 */
1075 cli->cl_avail_grant = ocd->ocd_grant;
1076 }
1077
1078 /* determine the appropriate chunk size used by osc_extent. */
1079 cli->cl_chunkbits = max_t(int, PAGE_CACHE_SHIFT, ocd->ocd_blocksize);
1080 client_obd_list_unlock(&cli->cl_loi_list_lock);
1081
1082 CDEBUG(D_CACHE, "%s, setting cl_avail_grant: %ld cl_lost_grant: %ld."
1083 "chunk bits: %d.\n", cli->cl_import->imp_obd->obd_name,
1084 cli->cl_avail_grant, cli->cl_lost_grant, cli->cl_chunkbits);
1085
1086 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT_SHRINK &&
1087 list_empty(&cli->cl_grant_shrink_list))
1088 osc_add_shrink_grant(cli);
1089}
1090
1091/* We assume that the reason this OSC got a short read is because it read
1092 * beyond the end of a stripe file; i.e. lustre is reading a sparse file
1093 * via the LOV, and it _knows_ it's reading inside the file, it's just that
1094 * this stripe never got written at or beyond this stripe offset yet. */
1095static void handle_short_read(int nob_read, obd_count page_count,
1096 struct brw_page **pga)
1097{
1098 char *ptr;
1099 int i = 0;
1100
1101 /* skip bytes read OK */
1102 while (nob_read > 0) {
1103 LASSERT (page_count > 0);
1104
1105 if (pga[i]->count > nob_read) {
1106 /* EOF inside this page */
1107 ptr = kmap(pga[i]->pg) +
1108 (pga[i]->off & ~CFS_PAGE_MASK);
1109 memset(ptr + nob_read, 0, pga[i]->count - nob_read);
1110 kunmap(pga[i]->pg);
1111 page_count--;
1112 i++;
1113 break;
1114 }
1115
1116 nob_read -= pga[i]->count;
1117 page_count--;
1118 i++;
1119 }
1120
1121 /* zero remaining pages */
1122 while (page_count-- > 0) {
1123 ptr = kmap(pga[i]->pg) + (pga[i]->off & ~CFS_PAGE_MASK);
1124 memset(ptr, 0, pga[i]->count);
1125 kunmap(pga[i]->pg);
1126 i++;
1127 }
1128}
1129
1130static int check_write_rcs(struct ptlrpc_request *req,
1131 int requested_nob, int niocount,
1132 obd_count page_count, struct brw_page **pga)
1133{
1134 int i;
1135 __u32 *remote_rcs;
1136
1137 remote_rcs = req_capsule_server_sized_get(&req->rq_pill, &RMF_RCS,
1138 sizeof(*remote_rcs) *
1139 niocount);
1140 if (remote_rcs == NULL) {
1141 CDEBUG(D_INFO, "Missing/short RC vector on BRW_WRITE reply\n");
1142 return(-EPROTO);
1143 }
1144
1145 /* return error if any niobuf was in error */
1146 for (i = 0; i < niocount; i++) {
1147 if ((int)remote_rcs[i] < 0)
1148 return(remote_rcs[i]);
1149
1150 if (remote_rcs[i] != 0) {
1151 CDEBUG(D_INFO, "rc[%d] invalid (%d) req %p\n",
1152 i, remote_rcs[i], req);
1153 return(-EPROTO);
1154 }
1155 }
1156
1157 if (req->rq_bulk->bd_nob_transferred != requested_nob) {
1158 CERROR("Unexpected # bytes transferred: %d (requested %d)\n",
1159 req->rq_bulk->bd_nob_transferred, requested_nob);
1160 return(-EPROTO);
1161 }
1162
1163 return (0);
1164}
1165
1166static inline int can_merge_pages(struct brw_page *p1, struct brw_page *p2)
1167{
1168 if (p1->flag != p2->flag) {
1169 unsigned mask = ~(OBD_BRW_FROM_GRANT| OBD_BRW_NOCACHE|
1170 OBD_BRW_SYNC|OBD_BRW_ASYNC|OBD_BRW_NOQUOTA);
1171
1172 /* warn if we try to combine flags that we don't know to be
1173 * safe to combine */
1174 if (unlikely((p1->flag & mask) != (p2->flag & mask))) {
1175 CWARN("Saw flags 0x%x and 0x%x in the same brw, please "
1176 "report this at http://bugs.whamcloud.com/\n",
1177 p1->flag, p2->flag);
1178 }
1179 return 0;
1180 }
1181
1182 return (p1->off + p1->count == p2->off);
1183}
1184
1185static obd_count osc_checksum_bulk(int nob, obd_count pg_count,
1186 struct brw_page **pga, int opc,
1187 cksum_type_t cksum_type)
1188{
1189 __u32 cksum;
1190 int i = 0;
1191 struct cfs_crypto_hash_desc *hdesc;
1192 unsigned int bufsize;
1193 int err;
1194 unsigned char cfs_alg = cksum_obd2cfs(cksum_type);
1195
1196 LASSERT(pg_count > 0);
1197
1198 hdesc = cfs_crypto_hash_init(cfs_alg, NULL, 0);
1199 if (IS_ERR(hdesc)) {
1200 CERROR("Unable to initialize checksum hash %s\n",
1201 cfs_crypto_hash_name(cfs_alg));
1202 return PTR_ERR(hdesc);
1203 }
1204
1205 while (nob > 0 && pg_count > 0) {
1206 int count = pga[i]->count > nob ? nob : pga[i]->count;
1207
1208 /* corrupt the data before we compute the checksum, to
1209 * simulate an OST->client data error */
1210 if (i == 0 && opc == OST_READ &&
1211 OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_RECEIVE)) {
1212 unsigned char *ptr = kmap(pga[i]->pg);
1213 int off = pga[i]->off & ~CFS_PAGE_MASK;
1214 memcpy(ptr + off, "bad1", min(4, nob));
1215 kunmap(pga[i]->pg);
1216 }
1217 cfs_crypto_hash_update_page(hdesc, pga[i]->pg,
1218 pga[i]->off & ~CFS_PAGE_MASK,
1219 count);
1220 LL_CDEBUG_PAGE(D_PAGE, pga[i]->pg, "off %d\n",
1221 (int)(pga[i]->off & ~CFS_PAGE_MASK));
1222
1223 nob -= pga[i]->count;
1224 pg_count--;
1225 i++;
1226 }
1227
1228 bufsize = 4;
1229 err = cfs_crypto_hash_final(hdesc, (unsigned char *)&cksum, &bufsize);
1230
1231 if (err)
1232 cfs_crypto_hash_final(hdesc, NULL, NULL);
1233
1234 /* For sending we only compute the wrong checksum instead
1235 * of corrupting the data so it is still correct on a redo */
1236 if (opc == OST_WRITE && OBD_FAIL_CHECK(OBD_FAIL_OSC_CHECKSUM_SEND))
1237 cksum++;
1238
1239 return cksum;
1240}
1241
1242static int osc_brw_prep_request(int cmd, struct client_obd *cli,struct obdo *oa,
1243 struct lov_stripe_md *lsm, obd_count page_count,
1244 struct brw_page **pga,
1245 struct ptlrpc_request **reqp,
1246 struct obd_capa *ocapa, int reserve,
1247 int resend)
1248{
1249 struct ptlrpc_request *req;
1250 struct ptlrpc_bulk_desc *desc;
1251 struct ost_body *body;
1252 struct obd_ioobj *ioobj;
1253 struct niobuf_remote *niobuf;
1254 int niocount, i, requested_nob, opc, rc;
1255 struct osc_brw_async_args *aa;
1256 struct req_capsule *pill;
1257 struct brw_page *pg_prev;
1258
1259 ENTRY;
1260 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ))
1261 RETURN(-ENOMEM); /* Recoverable */
1262 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_BRW_PREP_REQ2))
1263 RETURN(-EINVAL); /* Fatal */
1264
1265 if ((cmd & OBD_BRW_WRITE) != 0) {
1266 opc = OST_WRITE;
1267 req = ptlrpc_request_alloc_pool(cli->cl_import,
1268 cli->cl_import->imp_rq_pool,
1269 &RQF_OST_BRW_WRITE);
1270 } else {
1271 opc = OST_READ;
1272 req = ptlrpc_request_alloc(cli->cl_import, &RQF_OST_BRW_READ);
1273 }
1274 if (req == NULL)
1275 RETURN(-ENOMEM);
1276
1277 for (niocount = i = 1; i < page_count; i++) {
1278 if (!can_merge_pages(pga[i - 1], pga[i]))
1279 niocount++;
1280 }
1281
1282 pill = &req->rq_pill;
1283 req_capsule_set_size(pill, &RMF_OBD_IOOBJ, RCL_CLIENT,
1284 sizeof(*ioobj));
1285 req_capsule_set_size(pill, &RMF_NIOBUF_REMOTE, RCL_CLIENT,
1286 niocount * sizeof(*niobuf));
1287 osc_set_capa_size(req, &RMF_CAPA1, ocapa);
1288
1289 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, opc);
1290 if (rc) {
1291 ptlrpc_request_free(req);
1292 RETURN(rc);
1293 }
1294 req->rq_request_portal = OST_IO_PORTAL; /* bug 7198 */
1295 ptlrpc_at_set_req_timeout(req);
1296 /* ask ptlrpc not to resend on EINPROGRESS since BRWs have their own
1297 * retry logic */
1298 req->rq_no_retry_einprogress = 1;
1299
1300 desc = ptlrpc_prep_bulk_imp(req, page_count,
1301 cli->cl_import->imp_connect_data.ocd_brw_size >> LNET_MTU_BITS,
1302 opc == OST_WRITE ? BULK_GET_SOURCE : BULK_PUT_SINK,
1303 OST_BULK_PORTAL);
1304
1305 if (desc == NULL)
1306 GOTO(out, rc = -ENOMEM);
1307 /* NB request now owns desc and will free it when it gets freed */
1308
1309 body = req_capsule_client_get(pill, &RMF_OST_BODY);
1310 ioobj = req_capsule_client_get(pill, &RMF_OBD_IOOBJ);
1311 niobuf = req_capsule_client_get(pill, &RMF_NIOBUF_REMOTE);
1312 LASSERT(body != NULL && ioobj != NULL && niobuf != NULL);
1313
3b2f75fd 1314 lustre_set_wire_obdo(&req->rq_import->imp_connect_data, &body->oa, oa);
d7e09d03
PT
1315
1316 obdo_to_ioobj(oa, ioobj);
1317 ioobj->ioo_bufcnt = niocount;
1318 /* The high bits of ioo_max_brw tells server _maximum_ number of bulks
1319 * that might be send for this request. The actual number is decided
1320 * when the RPC is finally sent in ptlrpc_register_bulk(). It sends
1321 * "max - 1" for old client compatibility sending "0", and also so the
1322 * the actual maximum is a power-of-two number, not one less. LU-1431 */
1323 ioobj_max_brw_set(ioobj, desc->bd_md_max_brw);
1324 osc_pack_capa(req, body, ocapa);
1325 LASSERT(page_count > 0);
1326 pg_prev = pga[0];
1327 for (requested_nob = i = 0; i < page_count; i++, niobuf++) {
1328 struct brw_page *pg = pga[i];
1329 int poff = pg->off & ~CFS_PAGE_MASK;
1330
1331 LASSERT(pg->count > 0);
1332 /* make sure there is no gap in the middle of page array */
1333 LASSERTF(page_count == 1 ||
1334 (ergo(i == 0, poff + pg->count == PAGE_CACHE_SIZE) &&
1335 ergo(i > 0 && i < page_count - 1,
1336 poff == 0 && pg->count == PAGE_CACHE_SIZE) &&
1337 ergo(i == page_count - 1, poff == 0)),
1338 "i: %d/%d pg: %p off: "LPU64", count: %u\n",
1339 i, page_count, pg, pg->off, pg->count);
1340 LASSERTF(i == 0 || pg->off > pg_prev->off,
1341 "i %d p_c %u pg %p [pri %lu ind %lu] off "LPU64
1342 " prev_pg %p [pri %lu ind %lu] off "LPU64"\n",
1343 i, page_count,
1344 pg->pg, page_private(pg->pg), pg->pg->index, pg->off,
1345 pg_prev->pg, page_private(pg_prev->pg),
1346 pg_prev->pg->index, pg_prev->off);
1347 LASSERT((pga[0]->flag & OBD_BRW_SRVLOCK) ==
1348 (pg->flag & OBD_BRW_SRVLOCK));
1349
1350 ptlrpc_prep_bulk_page_pin(desc, pg->pg, poff, pg->count);
1351 requested_nob += pg->count;
1352
1353 if (i > 0 && can_merge_pages(pg_prev, pg)) {
1354 niobuf--;
1355 niobuf->len += pg->count;
1356 } else {
1357 niobuf->offset = pg->off;
1358 niobuf->len = pg->count;
1359 niobuf->flags = pg->flag;
1360 }
1361 pg_prev = pg;
1362 }
1363
1364 LASSERTF((void *)(niobuf - niocount) ==
1365 req_capsule_client_get(&req->rq_pill, &RMF_NIOBUF_REMOTE),
1366 "want %p - real %p\n", req_capsule_client_get(&req->rq_pill,
1367 &RMF_NIOBUF_REMOTE), (void *)(niobuf - niocount));
1368
1369 osc_announce_cached(cli, &body->oa, opc == OST_WRITE ? requested_nob:0);
1370 if (resend) {
1371 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1372 body->oa.o_valid |= OBD_MD_FLFLAGS;
1373 body->oa.o_flags = 0;
1374 }
1375 body->oa.o_flags |= OBD_FL_RECOV_RESEND;
1376 }
1377
1378 if (osc_should_shrink_grant(cli))
1379 osc_shrink_grant_local(cli, &body->oa);
1380
1381 /* size[REQ_REC_OFF] still sizeof (*body) */
1382 if (opc == OST_WRITE) {
1383 if (cli->cl_checksum &&
1384 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1385 /* store cl_cksum_type in a local variable since
1386 * it can be changed via lprocfs */
1387 cksum_type_t cksum_type = cli->cl_cksum_type;
1388
1389 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0) {
1390 oa->o_flags &= OBD_FL_LOCAL_MASK;
1391 body->oa.o_flags = 0;
1392 }
1393 body->oa.o_flags |= cksum_type_pack(cksum_type);
1394 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1395 body->oa.o_cksum = osc_checksum_bulk(requested_nob,
1396 page_count, pga,
1397 OST_WRITE,
1398 cksum_type);
1399 CDEBUG(D_PAGE, "checksum at write origin: %x\n",
1400 body->oa.o_cksum);
1401 /* save this in 'oa', too, for later checking */
1402 oa->o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1403 oa->o_flags |= cksum_type_pack(cksum_type);
1404 } else {
1405 /* clear out the checksum flag, in case this is a
1406 * resend but cl_checksum is no longer set. b=11238 */
1407 oa->o_valid &= ~OBD_MD_FLCKSUM;
1408 }
1409 oa->o_cksum = body->oa.o_cksum;
1410 /* 1 RC per niobuf */
1411 req_capsule_set_size(pill, &RMF_RCS, RCL_SERVER,
1412 sizeof(__u32) * niocount);
1413 } else {
1414 if (cli->cl_checksum &&
1415 !sptlrpc_flavor_has_bulk(&req->rq_flvr)) {
1416 if ((body->oa.o_valid & OBD_MD_FLFLAGS) == 0)
1417 body->oa.o_flags = 0;
1418 body->oa.o_flags |= cksum_type_pack(cli->cl_cksum_type);
1419 body->oa.o_valid |= OBD_MD_FLCKSUM | OBD_MD_FLFLAGS;
1420 }
1421 }
1422 ptlrpc_request_set_replen(req);
1423
1424 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
1425 aa = ptlrpc_req_async_args(req);
1426 aa->aa_oa = oa;
1427 aa->aa_requested_nob = requested_nob;
1428 aa->aa_nio_count = niocount;
1429 aa->aa_page_count = page_count;
1430 aa->aa_resends = 0;
1431 aa->aa_ppga = pga;
1432 aa->aa_cli = cli;
1433 INIT_LIST_HEAD(&aa->aa_oaps);
1434 if (ocapa && reserve)
1435 aa->aa_ocapa = capa_get(ocapa);
1436
1437 *reqp = req;
1438 RETURN(0);
1439
1440 out:
1441 ptlrpc_req_finished(req);
1442 RETURN(rc);
1443}
1444
1445static int check_write_checksum(struct obdo *oa, const lnet_process_id_t *peer,
1446 __u32 client_cksum, __u32 server_cksum, int nob,
1447 obd_count page_count, struct brw_page **pga,
1448 cksum_type_t client_cksum_type)
1449{
1450 __u32 new_cksum;
1451 char *msg;
1452 cksum_type_t cksum_type;
1453
1454 if (server_cksum == client_cksum) {
1455 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1456 return 0;
1457 }
1458
1459 cksum_type = cksum_type_unpack(oa->o_valid & OBD_MD_FLFLAGS ?
1460 oa->o_flags : 0);
1461 new_cksum = osc_checksum_bulk(nob, page_count, pga, OST_WRITE,
1462 cksum_type);
1463
1464 if (cksum_type != client_cksum_type)
1465 msg = "the server did not use the checksum type specified in "
1466 "the original request - likely a protocol problem";
1467 else if (new_cksum == server_cksum)
1468 msg = "changed on the client after we checksummed it - "
1469 "likely false positive due to mmap IO (bug 11742)";
1470 else if (new_cksum == client_cksum)
1471 msg = "changed in transit before arrival at OST";
1472 else
1473 msg = "changed in transit AND doesn't match the original - "
1474 "likely false positive due to mmap IO (bug 11742)";
1475
1476 LCONSOLE_ERROR_MSG(0x132, "BAD WRITE CHECKSUM: %s: from %s inode "DFID
1477 " object "DOSTID" extent ["LPU64"-"LPU64"]\n",
1478 msg, libcfs_nid2str(peer->nid),
1479 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_seq : (__u64)0,
1480 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_oid : 0,
1481 oa->o_valid & OBD_MD_FLFID ? oa->o_parent_ver : 0,
1482 POSTID(&oa->o_oi), pga[0]->off,
1483 pga[page_count-1]->off + pga[page_count-1]->count - 1);
1484 CERROR("original client csum %x (type %x), server csum %x (type %x), "
1485 "client csum now %x\n", client_cksum, client_cksum_type,
1486 server_cksum, cksum_type, new_cksum);
1487 return 1;
1488}
1489
1490/* Note rc enters this function as number of bytes transferred */
1491static int osc_brw_fini_request(struct ptlrpc_request *req, int rc)
1492{
1493 struct osc_brw_async_args *aa = (void *)&req->rq_async_args;
1494 const lnet_process_id_t *peer =
1495 &req->rq_import->imp_connection->c_peer;
1496 struct client_obd *cli = aa->aa_cli;
1497 struct ost_body *body;
1498 __u32 client_cksum = 0;
1499 ENTRY;
1500
1501 if (rc < 0 && rc != -EDQUOT) {
1502 DEBUG_REQ(D_INFO, req, "Failed request with rc = %d\n", rc);
1503 RETURN(rc);
1504 }
1505
1506 LASSERTF(req->rq_repmsg != NULL, "rc = %d\n", rc);
1507 body = req_capsule_server_get(&req->rq_pill, &RMF_OST_BODY);
1508 if (body == NULL) {
1509 DEBUG_REQ(D_INFO, req, "Can't unpack body\n");
1510 RETURN(-EPROTO);
1511 }
1512
1513 /* set/clear over quota flag for a uid/gid */
1514 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE &&
1515 body->oa.o_valid & (OBD_MD_FLUSRQUOTA | OBD_MD_FLGRPQUOTA)) {
1516 unsigned int qid[MAXQUOTAS] = { body->oa.o_uid, body->oa.o_gid };
1517
1518 CDEBUG(D_QUOTA, "setdq for [%u %u] with valid "LPX64", flags %x\n",
1519 body->oa.o_uid, body->oa.o_gid, body->oa.o_valid,
1520 body->oa.o_flags);
1521 osc_quota_setdq(cli, qid, body->oa.o_valid, body->oa.o_flags);
1522 }
1523
1524 osc_update_grant(cli, body);
1525
1526 if (rc < 0)
1527 RETURN(rc);
1528
1529 if (aa->aa_oa->o_valid & OBD_MD_FLCKSUM)
1530 client_cksum = aa->aa_oa->o_cksum; /* save for later */
1531
1532 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE) {
1533 if (rc > 0) {
1534 CERROR("Unexpected +ve rc %d\n", rc);
1535 RETURN(-EPROTO);
1536 }
1537 LASSERT(req->rq_bulk->bd_nob == aa->aa_requested_nob);
1538
1539 if (sptlrpc_cli_unwrap_bulk_write(req, req->rq_bulk))
1540 RETURN(-EAGAIN);
1541
1542 if ((aa->aa_oa->o_valid & OBD_MD_FLCKSUM) && client_cksum &&
1543 check_write_checksum(&body->oa, peer, client_cksum,
1544 body->oa.o_cksum, aa->aa_requested_nob,
1545 aa->aa_page_count, aa->aa_ppga,
1546 cksum_type_unpack(aa->aa_oa->o_flags)))
1547 RETURN(-EAGAIN);
1548
1549 rc = check_write_rcs(req, aa->aa_requested_nob,aa->aa_nio_count,
1550 aa->aa_page_count, aa->aa_ppga);
1551 GOTO(out, rc);
1552 }
1553
1554 /* The rest of this function executes only for OST_READs */
1555
1556 /* if unwrap_bulk failed, return -EAGAIN to retry */
1557 rc = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, rc);
1558 if (rc < 0)
1559 GOTO(out, rc = -EAGAIN);
1560
1561 if (rc > aa->aa_requested_nob) {
1562 CERROR("Unexpected rc %d (%d requested)\n", rc,
1563 aa->aa_requested_nob);
1564 RETURN(-EPROTO);
1565 }
1566
1567 if (rc != req->rq_bulk->bd_nob_transferred) {
1568 CERROR ("Unexpected rc %d (%d transferred)\n",
1569 rc, req->rq_bulk->bd_nob_transferred);
1570 return (-EPROTO);
1571 }
1572
1573 if (rc < aa->aa_requested_nob)
1574 handle_short_read(rc, aa->aa_page_count, aa->aa_ppga);
1575
1576 if (body->oa.o_valid & OBD_MD_FLCKSUM) {
1577 static int cksum_counter;
1578 __u32 server_cksum = body->oa.o_cksum;
1579 char *via;
1580 char *router;
1581 cksum_type_t cksum_type;
1582
1583 cksum_type = cksum_type_unpack(body->oa.o_valid &OBD_MD_FLFLAGS?
1584 body->oa.o_flags : 0);
1585 client_cksum = osc_checksum_bulk(rc, aa->aa_page_count,
1586 aa->aa_ppga, OST_READ,
1587 cksum_type);
1588
1589 if (peer->nid == req->rq_bulk->bd_sender) {
1590 via = router = "";
1591 } else {
1592 via = " via ";
1593 router = libcfs_nid2str(req->rq_bulk->bd_sender);
1594 }
1595
1596 if (server_cksum == ~0 && rc > 0) {
1597 CERROR("Protocol error: server %s set the 'checksum' "
1598 "bit, but didn't send a checksum. Not fatal, "
1599 "but please notify on http://bugs.whamcloud.com/\n",
1600 libcfs_nid2str(peer->nid));
1601 } else if (server_cksum != client_cksum) {
1602 LCONSOLE_ERROR_MSG(0x133, "%s: BAD READ CHECKSUM: from "
1603 "%s%s%s inode "DFID" object "DOSTID
1604 " extent ["LPU64"-"LPU64"]\n",
1605 req->rq_import->imp_obd->obd_name,
1606 libcfs_nid2str(peer->nid),
1607 via, router,
1608 body->oa.o_valid & OBD_MD_FLFID ?
1609 body->oa.o_parent_seq : (__u64)0,
1610 body->oa.o_valid & OBD_MD_FLFID ?
1611 body->oa.o_parent_oid : 0,
1612 body->oa.o_valid & OBD_MD_FLFID ?
1613 body->oa.o_parent_ver : 0,
1614 POSTID(&body->oa.o_oi),
1615 aa->aa_ppga[0]->off,
1616 aa->aa_ppga[aa->aa_page_count-1]->off +
1617 aa->aa_ppga[aa->aa_page_count-1]->count -
1618 1);
1619 CERROR("client %x, server %x, cksum_type %x\n",
1620 client_cksum, server_cksum, cksum_type);
1621 cksum_counter = 0;
1622 aa->aa_oa->o_cksum = client_cksum;
1623 rc = -EAGAIN;
1624 } else {
1625 cksum_counter++;
1626 CDEBUG(D_PAGE, "checksum %x confirmed\n", client_cksum);
1627 rc = 0;
1628 }
1629 } else if (unlikely(client_cksum)) {
1630 static int cksum_missed;
1631
1632 cksum_missed++;
1633 if ((cksum_missed & (-cksum_missed)) == cksum_missed)
1634 CERROR("Checksum %u requested from %s but not sent\n",
1635 cksum_missed, libcfs_nid2str(peer->nid));
1636 } else {
1637 rc = 0;
1638 }
1639out:
1640 if (rc >= 0)
3b2f75fd 1641 lustre_get_wire_obdo(&req->rq_import->imp_connect_data,
1642 aa->aa_oa, &body->oa);
d7e09d03
PT
1643
1644 RETURN(rc);
1645}
1646
1647static int osc_brw_internal(int cmd, struct obd_export *exp, struct obdo *oa,
1648 struct lov_stripe_md *lsm,
1649 obd_count page_count, struct brw_page **pga,
1650 struct obd_capa *ocapa)
1651{
1652 struct ptlrpc_request *req;
1653 int rc;
1654 wait_queue_head_t waitq;
1655 int generation, resends = 0;
1656 struct l_wait_info lwi;
1657
1658 ENTRY;
1659
1660 init_waitqueue_head(&waitq);
1661 generation = exp->exp_obd->u.cli.cl_import->imp_generation;
1662
1663restart_bulk:
1664 rc = osc_brw_prep_request(cmd, &exp->exp_obd->u.cli, oa, lsm,
1665 page_count, pga, &req, ocapa, 0, resends);
1666 if (rc != 0)
1667 return (rc);
1668
1669 if (resends) {
1670 req->rq_generation_set = 1;
1671 req->rq_import_generation = generation;
1672 req->rq_sent = cfs_time_current_sec() + resends;
1673 }
1674
1675 rc = ptlrpc_queue_wait(req);
1676
1677 if (rc == -ETIMEDOUT && req->rq_resend) {
1678 DEBUG_REQ(D_HA, req, "BULK TIMEOUT");
1679 ptlrpc_req_finished(req);
1680 goto restart_bulk;
1681 }
1682
1683 rc = osc_brw_fini_request(req, rc);
1684
1685 ptlrpc_req_finished(req);
1686 /* When server return -EINPROGRESS, client should always retry
1687 * regardless of the number of times the bulk was resent already.*/
1688 if (osc_recoverable_error(rc)) {
1689 resends++;
1690 if (rc != -EINPROGRESS &&
1691 !client_should_resend(resends, &exp->exp_obd->u.cli)) {
1692 CERROR("%s: too many resend retries for object: "
1693 ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1694 POSTID(&oa->o_oi), rc);
1695 goto out;
1696 }
1697 if (generation !=
1698 exp->exp_obd->u.cli.cl_import->imp_generation) {
1699 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1700 ""DOSTID", rc = %d.\n", exp->exp_obd->obd_name,
1701 POSTID(&oa->o_oi), rc);
1702 goto out;
1703 }
1704
1705 lwi = LWI_TIMEOUT_INTR(cfs_time_seconds(resends), NULL, NULL,
1706 NULL);
1707 l_wait_event(waitq, 0, &lwi);
1708
1709 goto restart_bulk;
1710 }
1711out:
1712 if (rc == -EAGAIN || rc == -EINPROGRESS)
1713 rc = -EIO;
1714 RETURN (rc);
1715}
1716
1717static int osc_brw_redo_request(struct ptlrpc_request *request,
1718 struct osc_brw_async_args *aa, int rc)
1719{
1720 struct ptlrpc_request *new_req;
1721 struct osc_brw_async_args *new_aa;
1722 struct osc_async_page *oap;
1723 ENTRY;
1724
1725 DEBUG_REQ(rc == -EINPROGRESS ? D_RPCTRACE : D_ERROR, request,
1726 "redo for recoverable error %d", rc);
1727
1728 rc = osc_brw_prep_request(lustre_msg_get_opc(request->rq_reqmsg) ==
1729 OST_WRITE ? OBD_BRW_WRITE :OBD_BRW_READ,
1730 aa->aa_cli, aa->aa_oa,
1731 NULL /* lsm unused by osc currently */,
1732 aa->aa_page_count, aa->aa_ppga,
1733 &new_req, aa->aa_ocapa, 0, 1);
1734 if (rc)
1735 RETURN(rc);
1736
1737 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
1738 if (oap->oap_request != NULL) {
1739 LASSERTF(request == oap->oap_request,
1740 "request %p != oap_request %p\n",
1741 request, oap->oap_request);
1742 if (oap->oap_interrupted) {
1743 ptlrpc_req_finished(new_req);
1744 RETURN(-EINTR);
1745 }
1746 }
1747 }
1748 /* New request takes over pga and oaps from old request.
1749 * Note that copying a list_head doesn't work, need to move it... */
1750 aa->aa_resends++;
1751 new_req->rq_interpret_reply = request->rq_interpret_reply;
1752 new_req->rq_async_args = request->rq_async_args;
d7e09d03
PT
1753 /* cap resend delay to the current request timeout, this is similar to
1754 * what ptlrpc does (see after_reply()) */
1755 if (aa->aa_resends > new_req->rq_timeout)
1756 new_req->rq_sent = cfs_time_current_sec() + new_req->rq_timeout;
1757 else
1758 new_req->rq_sent = cfs_time_current_sec() + aa->aa_resends;
1759 new_req->rq_generation_set = 1;
1760 new_req->rq_import_generation = request->rq_import_generation;
1761
1762 new_aa = ptlrpc_req_async_args(new_req);
1763
1764 INIT_LIST_HEAD(&new_aa->aa_oaps);
1765 list_splice_init(&aa->aa_oaps, &new_aa->aa_oaps);
1766 INIT_LIST_HEAD(&new_aa->aa_exts);
1767 list_splice_init(&aa->aa_exts, &new_aa->aa_exts);
1768 new_aa->aa_resends = aa->aa_resends;
1769
1770 list_for_each_entry(oap, &new_aa->aa_oaps, oap_rpc_item) {
1771 if (oap->oap_request) {
1772 ptlrpc_req_finished(oap->oap_request);
1773 oap->oap_request = ptlrpc_request_addref(new_req);
1774 }
1775 }
1776
1777 new_aa->aa_ocapa = aa->aa_ocapa;
1778 aa->aa_ocapa = NULL;
1779
1780 /* XXX: This code will run into problem if we're going to support
1781 * to add a series of BRW RPCs into a self-defined ptlrpc_request_set
1782 * and wait for all of them to be finished. We should inherit request
1783 * set from old request. */
1784 ptlrpcd_add_req(new_req, PDL_POLICY_SAME, -1);
1785
1786 DEBUG_REQ(D_INFO, new_req, "new request");
1787 RETURN(0);
1788}
1789
1790/*
1791 * ugh, we want disk allocation on the target to happen in offset order. we'll
1792 * follow sedgewicks advice and stick to the dead simple shellsort -- it'll do
1793 * fine for our small page arrays and doesn't require allocation. its an
1794 * insertion sort that swaps elements that are strides apart, shrinking the
1795 * stride down until its '1' and the array is sorted.
1796 */
1797static void sort_brw_pages(struct brw_page **array, int num)
1798{
1799 int stride, i, j;
1800 struct brw_page *tmp;
1801
1802 if (num == 1)
1803 return;
1804 for (stride = 1; stride < num ; stride = (stride * 3) + 1)
1805 ;
1806
1807 do {
1808 stride /= 3;
1809 for (i = stride ; i < num ; i++) {
1810 tmp = array[i];
1811 j = i;
1812 while (j >= stride && array[j - stride]->off > tmp->off) {
1813 array[j] = array[j - stride];
1814 j -= stride;
1815 }
1816 array[j] = tmp;
1817 }
1818 } while (stride > 1);
1819}
1820
1821static obd_count max_unfragmented_pages(struct brw_page **pg, obd_count pages)
1822{
1823 int count = 1;
1824 int offset;
1825 int i = 0;
1826
1827 LASSERT (pages > 0);
1828 offset = pg[i]->off & ~CFS_PAGE_MASK;
1829
1830 for (;;) {
1831 pages--;
1832 if (pages == 0) /* that's all */
1833 return count;
1834
1835 if (offset + pg[i]->count < PAGE_CACHE_SIZE)
1836 return count; /* doesn't end on page boundary */
1837
1838 i++;
1839 offset = pg[i]->off & ~CFS_PAGE_MASK;
1840 if (offset != 0) /* doesn't start on page boundary */
1841 return count;
1842
1843 count++;
1844 }
1845}
1846
1847static struct brw_page **osc_build_ppga(struct brw_page *pga, obd_count count)
1848{
1849 struct brw_page **ppga;
1850 int i;
1851
1852 OBD_ALLOC(ppga, sizeof(*ppga) * count);
1853 if (ppga == NULL)
1854 return NULL;
1855
1856 for (i = 0; i < count; i++)
1857 ppga[i] = pga + i;
1858 return ppga;
1859}
1860
1861static void osc_release_ppga(struct brw_page **ppga, obd_count count)
1862{
1863 LASSERT(ppga != NULL);
1864 OBD_FREE(ppga, sizeof(*ppga) * count);
1865}
1866
1867static int osc_brw(int cmd, struct obd_export *exp, struct obd_info *oinfo,
1868 obd_count page_count, struct brw_page *pga,
1869 struct obd_trans_info *oti)
1870{
1871 struct obdo *saved_oa = NULL;
1872 struct brw_page **ppga, **orig;
1873 struct obd_import *imp = class_exp2cliimp(exp);
1874 struct client_obd *cli;
1875 int rc, page_count_orig;
1876 ENTRY;
1877
1878 LASSERT((imp != NULL) && (imp->imp_obd != NULL));
1879 cli = &imp->imp_obd->u.cli;
1880
1881 if (cmd & OBD_BRW_CHECK) {
1882 /* The caller just wants to know if there's a chance that this
1883 * I/O can succeed */
1884
1885 if (imp->imp_invalid)
1886 RETURN(-EIO);
1887 RETURN(0);
1888 }
1889
1890 /* test_brw with a failed create can trip this, maybe others. */
1891 LASSERT(cli->cl_max_pages_per_rpc);
1892
1893 rc = 0;
1894
1895 orig = ppga = osc_build_ppga(pga, page_count);
1896 if (ppga == NULL)
1897 RETURN(-ENOMEM);
1898 page_count_orig = page_count;
1899
1900 sort_brw_pages(ppga, page_count);
1901 while (page_count) {
1902 obd_count pages_per_brw;
1903
1904 if (page_count > cli->cl_max_pages_per_rpc)
1905 pages_per_brw = cli->cl_max_pages_per_rpc;
1906 else
1907 pages_per_brw = page_count;
1908
1909 pages_per_brw = max_unfragmented_pages(ppga, pages_per_brw);
1910
1911 if (saved_oa != NULL) {
1912 /* restore previously saved oa */
1913 *oinfo->oi_oa = *saved_oa;
1914 } else if (page_count > pages_per_brw) {
1915 /* save a copy of oa (brw will clobber it) */
1916 OBDO_ALLOC(saved_oa);
1917 if (saved_oa == NULL)
1918 GOTO(out, rc = -ENOMEM);
1919 *saved_oa = *oinfo->oi_oa;
1920 }
1921
1922 rc = osc_brw_internal(cmd, exp, oinfo->oi_oa, oinfo->oi_md,
1923 pages_per_brw, ppga, oinfo->oi_capa);
1924
1925 if (rc != 0)
1926 break;
1927
1928 page_count -= pages_per_brw;
1929 ppga += pages_per_brw;
1930 }
1931
1932out:
1933 osc_release_ppga(orig, page_count_orig);
1934
1935 if (saved_oa != NULL)
1936 OBDO_FREE(saved_oa);
1937
1938 RETURN(rc);
1939}
1940
1941static int brw_interpret(const struct lu_env *env,
1942 struct ptlrpc_request *req, void *data, int rc)
1943{
1944 struct osc_brw_async_args *aa = data;
1945 struct osc_extent *ext;
1946 struct osc_extent *tmp;
1947 struct cl_object *obj = NULL;
1948 struct client_obd *cli = aa->aa_cli;
1949 ENTRY;
1950
1951 rc = osc_brw_fini_request(req, rc);
1952 CDEBUG(D_INODE, "request %p aa %p rc %d\n", req, aa, rc);
1953 /* When server return -EINPROGRESS, client should always retry
1954 * regardless of the number of times the bulk was resent already. */
1955 if (osc_recoverable_error(rc)) {
1956 if (req->rq_import_generation !=
1957 req->rq_import->imp_generation) {
1958 CDEBUG(D_HA, "%s: resend cross eviction for object: "
1959 ""DOSTID", rc = %d.\n",
1960 req->rq_import->imp_obd->obd_name,
1961 POSTID(&aa->aa_oa->o_oi), rc);
1962 } else if (rc == -EINPROGRESS ||
1963 client_should_resend(aa->aa_resends, aa->aa_cli)) {
1964 rc = osc_brw_redo_request(req, aa, rc);
1965 } else {
1966 CERROR("%s: too many resent retries for object: "
1967 ""LPU64":"LPU64", rc = %d.\n",
1968 req->rq_import->imp_obd->obd_name,
1969 POSTID(&aa->aa_oa->o_oi), rc);
1970 }
1971
1972 if (rc == 0)
1973 RETURN(0);
1974 else if (rc == -EAGAIN || rc == -EINPROGRESS)
1975 rc = -EIO;
1976 }
1977
1978 if (aa->aa_ocapa) {
1979 capa_put(aa->aa_ocapa);
1980 aa->aa_ocapa = NULL;
1981 }
1982
1983 list_for_each_entry_safe(ext, tmp, &aa->aa_exts, oe_link) {
1984 if (obj == NULL && rc == 0) {
1985 obj = osc2cl(ext->oe_obj);
1986 cl_object_get(obj);
1987 }
1988
1989 list_del_init(&ext->oe_link);
1990 osc_extent_finish(env, ext, 1, rc);
1991 }
1992 LASSERT(list_empty(&aa->aa_exts));
1993 LASSERT(list_empty(&aa->aa_oaps));
1994
1995 if (obj != NULL) {
1996 struct obdo *oa = aa->aa_oa;
1997 struct cl_attr *attr = &osc_env_info(env)->oti_attr;
1998 unsigned long valid = 0;
1999
2000 LASSERT(rc == 0);
2001 if (oa->o_valid & OBD_MD_FLBLOCKS) {
2002 attr->cat_blocks = oa->o_blocks;
2003 valid |= CAT_BLOCKS;
2004 }
2005 if (oa->o_valid & OBD_MD_FLMTIME) {
2006 attr->cat_mtime = oa->o_mtime;
2007 valid |= CAT_MTIME;
2008 }
2009 if (oa->o_valid & OBD_MD_FLATIME) {
2010 attr->cat_atime = oa->o_atime;
2011 valid |= CAT_ATIME;
2012 }
2013 if (oa->o_valid & OBD_MD_FLCTIME) {
2014 attr->cat_ctime = oa->o_ctime;
2015 valid |= CAT_CTIME;
2016 }
2017 if (valid != 0) {
2018 cl_object_attr_lock(obj);
2019 cl_object_attr_set(env, obj, attr, valid);
2020 cl_object_attr_unlock(obj);
2021 }
2022 cl_object_put(env, obj);
2023 }
2024 OBDO_FREE(aa->aa_oa);
2025
2026 cl_req_completion(env, aa->aa_clerq, rc < 0 ? rc :
2027 req->rq_bulk->bd_nob_transferred);
2028 osc_release_ppga(aa->aa_ppga, aa->aa_page_count);
2029 ptlrpc_lprocfs_brw(req, req->rq_bulk->bd_nob_transferred);
2030
2031 client_obd_list_lock(&cli->cl_loi_list_lock);
2032 /* We need to decrement before osc_ap_completion->osc_wake_cache_waiters
2033 * is called so we know whether to go to sync BRWs or wait for more
2034 * RPCs to complete */
2035 if (lustre_msg_get_opc(req->rq_reqmsg) == OST_WRITE)
2036 cli->cl_w_in_flight--;
2037 else
2038 cli->cl_r_in_flight--;
2039 osc_wake_cache_waiters(cli);
2040 client_obd_list_unlock(&cli->cl_loi_list_lock);
2041
2042 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
2043 RETURN(rc);
2044}
2045
d7e09d03
PT
2046/**
2047 * Build an RPC by the list of extent @ext_list. The caller must ensure
2048 * that the total pages in this list are NOT over max pages per RPC.
2049 * Extents in the list must be in OES_RPC state.
2050 */
2051int osc_build_rpc(const struct lu_env *env, struct client_obd *cli,
2052 struct list_head *ext_list, int cmd, pdl_policy_t pol)
2053{
cad6fafa
BJ
2054 struct ptlrpc_request *req = NULL;
2055 struct osc_extent *ext;
2056 struct brw_page **pga = NULL;
2057 struct osc_brw_async_args *aa = NULL;
2058 struct obdo *oa = NULL;
2059 struct osc_async_page *oap;
2060 struct osc_async_page *tmp;
2061 struct cl_req *clerq = NULL;
2062 enum cl_req_type crt = (cmd & OBD_BRW_WRITE) ? CRT_WRITE :
2063 CRT_READ;
2064 struct ldlm_lock *lock = NULL;
2065 struct cl_req_attr *crattr = NULL;
2066 obd_off starting_offset = OBD_OBJECT_EOF;
2067 obd_off ending_offset = 0;
2068 int mpflag = 0;
2069 int mem_tight = 0;
2070 int page_count = 0;
2071 int i;
2072 int rc;
d7e09d03 2073 LIST_HEAD(rpc_list);
d7e09d03
PT
2074
2075 ENTRY;
2076 LASSERT(!list_empty(ext_list));
2077
2078 /* add pages into rpc_list to build BRW rpc */
2079 list_for_each_entry(ext, ext_list, oe_link) {
2080 LASSERT(ext->oe_state == OES_RPC);
2081 mem_tight |= ext->oe_memalloc;
2082 list_for_each_entry(oap, &ext->oe_pages, oap_pending_item) {
2083 ++page_count;
2084 list_add_tail(&oap->oap_rpc_item, &rpc_list);
2085 if (starting_offset > oap->oap_obj_off)
2086 starting_offset = oap->oap_obj_off;
2087 else
2088 LASSERT(oap->oap_page_off == 0);
2089 if (ending_offset < oap->oap_obj_off + oap->oap_count)
2090 ending_offset = oap->oap_obj_off +
2091 oap->oap_count;
2092 else
2093 LASSERT(oap->oap_page_off + oap->oap_count ==
2094 PAGE_CACHE_SIZE);
2095 }
2096 }
2097
2098 if (mem_tight)
2099 mpflag = cfs_memory_pressure_get_and_set();
2100
cad6fafa
BJ
2101 OBD_ALLOC(crattr, sizeof(*crattr));
2102 if (crattr == NULL)
2103 GOTO(out, rc = -ENOMEM);
2104
d7e09d03
PT
2105 OBD_ALLOC(pga, sizeof(*pga) * page_count);
2106 if (pga == NULL)
2107 GOTO(out, rc = -ENOMEM);
2108
2109 OBDO_ALLOC(oa);
2110 if (oa == NULL)
2111 GOTO(out, rc = -ENOMEM);
2112
2113 i = 0;
2114 list_for_each_entry(oap, &rpc_list, oap_rpc_item) {
2115 struct cl_page *page = oap2cl_page(oap);
2116 if (clerq == NULL) {
2117 clerq = cl_req_alloc(env, page, crt,
cad6fafa 2118 1 /* only 1-object rpcs for now */);
d7e09d03
PT
2119 if (IS_ERR(clerq))
2120 GOTO(out, rc = PTR_ERR(clerq));
2121 lock = oap->oap_ldlm_lock;
2122 }
2123 if (mem_tight)
2124 oap->oap_brw_flags |= OBD_BRW_MEMALLOC;
2125 pga[i] = &oap->oap_brw_page;
2126 pga[i]->off = oap->oap_obj_off + oap->oap_page_off;
2127 CDEBUG(0, "put page %p index %lu oap %p flg %x to pga\n",
cad6fafa
BJ
2128 pga[i]->pg, page_index(oap->oap_page), oap,
2129 pga[i]->flag);
d7e09d03
PT
2130 i++;
2131 cl_req_page_add(env, clerq, page);
2132 }
2133
2134 /* always get the data for the obdo for the rpc */
2135 LASSERT(clerq != NULL);
cad6fafa
BJ
2136 crattr->cra_oa = oa;
2137 cl_req_attr_set(env, clerq, crattr, ~0ULL);
d7e09d03
PT
2138 if (lock) {
2139 oa->o_handle = lock->l_remote_handle;
2140 oa->o_valid |= OBD_MD_FLHANDLE;
2141 }
2142
2143 rc = cl_req_prep(env, clerq);
2144 if (rc != 0) {
2145 CERROR("cl_req_prep failed: %d\n", rc);
2146 GOTO(out, rc);
2147 }
2148
2149 sort_brw_pages(pga, page_count);
2150 rc = osc_brw_prep_request(cmd, cli, oa, NULL, page_count,
cad6fafa 2151 pga, &req, crattr->cra_capa, 1, 0);
d7e09d03
PT
2152 if (rc != 0) {
2153 CERROR("prep_req failed: %d\n", rc);
2154 GOTO(out, rc);
2155 }
2156
d7e09d03
PT
2157 req->rq_interpret_reply = brw_interpret;
2158
2159 if (mem_tight != 0)
2160 req->rq_memalloc = 1;
2161
2162 /* Need to update the timestamps after the request is built in case
2163 * we race with setattr (locally or in queue at OST). If OST gets
2164 * later setattr before earlier BRW (as determined by the request xid),
2165 * the OST will not use BRW timestamps. Sadly, there is no obvious
2166 * way to do this in a single call. bug 10150 */
cad6fafa 2167 cl_req_attr_set(env, clerq, crattr,
d7e09d03
PT
2168 OBD_MD_FLMTIME|OBD_MD_FLCTIME|OBD_MD_FLATIME);
2169
cad6fafa 2170 lustre_msg_set_jobid(req->rq_reqmsg, crattr->cra_jobid);
d7e09d03
PT
2171
2172 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
2173 aa = ptlrpc_req_async_args(req);
2174 INIT_LIST_HEAD(&aa->aa_oaps);
2175 list_splice_init(&rpc_list, &aa->aa_oaps);
2176 INIT_LIST_HEAD(&aa->aa_exts);
2177 list_splice_init(ext_list, &aa->aa_exts);
2178 aa->aa_clerq = clerq;
2179
2180 /* queued sync pages can be torn down while the pages
2181 * were between the pending list and the rpc */
2182 tmp = NULL;
2183 list_for_each_entry(oap, &aa->aa_oaps, oap_rpc_item) {
2184 /* only one oap gets a request reference */
2185 if (tmp == NULL)
2186 tmp = oap;
2187 if (oap->oap_interrupted && !req->rq_intr) {
2188 CDEBUG(D_INODE, "oap %p in req %p interrupted\n",
2189 oap, req);
2190 ptlrpc_mark_interrupted(req);
2191 }
2192 }
2193 if (tmp != NULL)
2194 tmp->oap_request = ptlrpc_request_addref(req);
2195
2196 client_obd_list_lock(&cli->cl_loi_list_lock);
2197 starting_offset >>= PAGE_CACHE_SHIFT;
2198 if (cmd == OBD_BRW_READ) {
2199 cli->cl_r_in_flight++;
2200 lprocfs_oh_tally_log2(&cli->cl_read_page_hist, page_count);
2201 lprocfs_oh_tally(&cli->cl_read_rpc_hist, cli->cl_r_in_flight);
2202 lprocfs_oh_tally_log2(&cli->cl_read_offset_hist,
2203 starting_offset + 1);
2204 } else {
2205 cli->cl_w_in_flight++;
2206 lprocfs_oh_tally_log2(&cli->cl_write_page_hist, page_count);
2207 lprocfs_oh_tally(&cli->cl_write_rpc_hist, cli->cl_w_in_flight);
2208 lprocfs_oh_tally_log2(&cli->cl_write_offset_hist,
2209 starting_offset + 1);
2210 }
2211 client_obd_list_unlock(&cli->cl_loi_list_lock);
2212
2213 DEBUG_REQ(D_INODE, req, "%d pages, aa %p. now %dr/%dw in flight",
2214 page_count, aa, cli->cl_r_in_flight,
2215 cli->cl_w_in_flight);
2216
2217 /* XXX: Maybe the caller can check the RPC bulk descriptor to
2218 * see which CPU/NUMA node the majority of pages were allocated
2219 * on, and try to assign the async RPC to the CPU core
2220 * (PDL_POLICY_PREFERRED) to reduce cross-CPU memory traffic.
2221 *
2222 * But on the other hand, we expect that multiple ptlrpcd
2223 * threads and the initial write sponsor can run in parallel,
2224 * especially when data checksum is enabled, which is CPU-bound
2225 * operation and single ptlrpcd thread cannot process in time.
2226 * So more ptlrpcd threads sharing BRW load
2227 * (with PDL_POLICY_ROUND) seems better.
2228 */
2229 ptlrpcd_add_req(req, pol, -1);
2230 rc = 0;
2231 EXIT;
2232
2233out:
2234 if (mem_tight != 0)
2235 cfs_memory_pressure_restore(mpflag);
2236
cad6fafa
BJ
2237 if (crattr != NULL) {
2238 capa_put(crattr->cra_capa);
2239 OBD_FREE(crattr, sizeof(*crattr));
2240 }
2241
d7e09d03
PT
2242 if (rc != 0) {
2243 LASSERT(req == NULL);
2244
2245 if (oa)
2246 OBDO_FREE(oa);
2247 if (pga)
2248 OBD_FREE(pga, sizeof(*pga) * page_count);
2249 /* this should happen rarely and is pretty bad, it makes the
2250 * pending list not follow the dirty order */
2251 while (!list_empty(ext_list)) {
2252 ext = list_entry(ext_list->next, struct osc_extent,
2253 oe_link);
2254 list_del_init(&ext->oe_link);
2255 osc_extent_finish(env, ext, 0, rc);
2256 }
2257 if (clerq && !IS_ERR(clerq))
2258 cl_req_completion(env, clerq, rc);
2259 }
2260 RETURN(rc);
2261}
2262
2263static int osc_set_lock_data_with_check(struct ldlm_lock *lock,
2264 struct ldlm_enqueue_info *einfo)
2265{
2266 void *data = einfo->ei_cbdata;
2267 int set = 0;
2268
2269 LASSERT(lock != NULL);
2270 LASSERT(lock->l_blocking_ast == einfo->ei_cb_bl);
2271 LASSERT(lock->l_resource->lr_type == einfo->ei_type);
2272 LASSERT(lock->l_completion_ast == einfo->ei_cb_cp);
2273 LASSERT(lock->l_glimpse_ast == einfo->ei_cb_gl);
2274
2275 lock_res_and_lock(lock);
2276 spin_lock(&osc_ast_guard);
2277
2278 if (lock->l_ast_data == NULL)
2279 lock->l_ast_data = data;
2280 if (lock->l_ast_data == data)
2281 set = 1;
2282
2283 spin_unlock(&osc_ast_guard);
2284 unlock_res_and_lock(lock);
2285
2286 return set;
2287}
2288
2289static int osc_set_data_with_check(struct lustre_handle *lockh,
2290 struct ldlm_enqueue_info *einfo)
2291{
2292 struct ldlm_lock *lock = ldlm_handle2lock(lockh);
2293 int set = 0;
2294
2295 if (lock != NULL) {
2296 set = osc_set_lock_data_with_check(lock, einfo);
2297 LDLM_LOCK_PUT(lock);
2298 } else
2299 CERROR("lockh %p, data %p - client evicted?\n",
2300 lockh, einfo->ei_cbdata);
2301 return set;
2302}
2303
2304static int osc_change_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2305 ldlm_iterator_t replace, void *data)
2306{
2307 struct ldlm_res_id res_id;
2308 struct obd_device *obd = class_exp2obd(exp);
2309
2310 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2311 ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2312 return 0;
2313}
2314
2315/* find any ldlm lock of the inode in osc
2316 * return 0 not find
2317 * 1 find one
2318 * < 0 error */
2319static int osc_find_cbdata(struct obd_export *exp, struct lov_stripe_md *lsm,
2320 ldlm_iterator_t replace, void *data)
2321{
2322 struct ldlm_res_id res_id;
2323 struct obd_device *obd = class_exp2obd(exp);
2324 int rc = 0;
2325
2326 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2327 rc = ldlm_resource_iterate(obd->obd_namespace, &res_id, replace, data);
2328 if (rc == LDLM_ITER_STOP)
2329 return(1);
2330 if (rc == LDLM_ITER_CONTINUE)
2331 return(0);
2332 return(rc);
2333}
2334
2335static int osc_enqueue_fini(struct ptlrpc_request *req, struct ost_lvb *lvb,
2336 obd_enqueue_update_f upcall, void *cookie,
2337 __u64 *flags, int agl, int rc)
2338{
2339 int intent = *flags & LDLM_FL_HAS_INTENT;
2340 ENTRY;
2341
2342 if (intent) {
2343 /* The request was created before ldlm_cli_enqueue call. */
2344 if (rc == ELDLM_LOCK_ABORTED) {
2345 struct ldlm_reply *rep;
2346 rep = req_capsule_server_get(&req->rq_pill,
2347 &RMF_DLM_REP);
2348
2349 LASSERT(rep != NULL);
2350 if (rep->lock_policy_res1)
2351 rc = rep->lock_policy_res1;
2352 }
2353 }
2354
2355 if ((intent != 0 && rc == ELDLM_LOCK_ABORTED && agl == 0) ||
2356 (rc == 0)) {
2357 *flags |= LDLM_FL_LVB_READY;
2358 CDEBUG(D_INODE,"got kms "LPU64" blocks "LPU64" mtime "LPU64"\n",
2359 lvb->lvb_size, lvb->lvb_blocks, lvb->lvb_mtime);
2360 }
2361
2362 /* Call the update callback. */
2363 rc = (*upcall)(cookie, rc);
2364 RETURN(rc);
2365}
2366
2367static int osc_enqueue_interpret(const struct lu_env *env,
2368 struct ptlrpc_request *req,
2369 struct osc_enqueue_args *aa, int rc)
2370{
2371 struct ldlm_lock *lock;
2372 struct lustre_handle handle;
2373 __u32 mode;
2374 struct ost_lvb *lvb;
2375 __u32 lvb_len;
2376 __u64 *flags = aa->oa_flags;
2377
2378 /* Make a local copy of a lock handle and a mode, because aa->oa_*
2379 * might be freed anytime after lock upcall has been called. */
2380 lustre_handle_copy(&handle, aa->oa_lockh);
2381 mode = aa->oa_ei->ei_mode;
2382
2383 /* ldlm_cli_enqueue is holding a reference on the lock, so it must
2384 * be valid. */
2385 lock = ldlm_handle2lock(&handle);
2386
2387 /* Take an additional reference so that a blocking AST that
2388 * ldlm_cli_enqueue_fini() might post for a failed lock, is guaranteed
2389 * to arrive after an upcall has been executed by
2390 * osc_enqueue_fini(). */
2391 ldlm_lock_addref(&handle, mode);
2392
2393 /* Let CP AST to grant the lock first. */
2394 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_ENQ_RACE, 1);
2395
2396 if (aa->oa_agl && rc == ELDLM_LOCK_ABORTED) {
2397 lvb = NULL;
2398 lvb_len = 0;
2399 } else {
2400 lvb = aa->oa_lvb;
2401 lvb_len = sizeof(*aa->oa_lvb);
2402 }
2403
2404 /* Complete obtaining the lock procedure. */
2405 rc = ldlm_cli_enqueue_fini(aa->oa_exp, req, aa->oa_ei->ei_type, 1,
2406 mode, flags, lvb, lvb_len, &handle, rc);
2407 /* Complete osc stuff. */
2408 rc = osc_enqueue_fini(req, aa->oa_lvb, aa->oa_upcall, aa->oa_cookie,
2409 flags, aa->oa_agl, rc);
2410
2411 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_CP_CANCEL_RACE, 10);
2412
2413 /* Release the lock for async request. */
2414 if (lustre_handle_is_used(&handle) && rc == ELDLM_OK)
2415 /*
2416 * Releases a reference taken by ldlm_cli_enqueue(), if it is
2417 * not already released by
2418 * ldlm_cli_enqueue_fini()->failed_lock_cleanup()
2419 */
2420 ldlm_lock_decref(&handle, mode);
2421
2422 LASSERTF(lock != NULL, "lockh %p, req %p, aa %p - client evicted?\n",
2423 aa->oa_lockh, req, aa);
2424 ldlm_lock_decref(&handle, mode);
2425 LDLM_LOCK_PUT(lock);
2426 return rc;
2427}
2428
2429void osc_update_enqueue(struct lustre_handle *lov_lockhp,
2430 struct lov_oinfo *loi, int flags,
2431 struct ost_lvb *lvb, __u32 mode, int rc)
2432{
2433 struct ldlm_lock *lock = ldlm_handle2lock(lov_lockhp);
2434
2435 if (rc == ELDLM_OK) {
2436 __u64 tmp;
2437
2438 LASSERT(lock != NULL);
2439 loi->loi_lvb = *lvb;
2440 tmp = loi->loi_lvb.lvb_size;
2441 /* Extend KMS up to the end of this lock and no further
2442 * A lock on [x,y] means a KMS of up to y + 1 bytes! */
2443 if (tmp > lock->l_policy_data.l_extent.end)
2444 tmp = lock->l_policy_data.l_extent.end + 1;
2445 if (tmp >= loi->loi_kms) {
2446 LDLM_DEBUG(lock, "lock acquired, setting rss="LPU64
2447 ", kms="LPU64, loi->loi_lvb.lvb_size, tmp);
2448 loi_kms_set(loi, tmp);
2449 } else {
2450 LDLM_DEBUG(lock, "lock acquired, setting rss="
2451 LPU64"; leaving kms="LPU64", end="LPU64,
2452 loi->loi_lvb.lvb_size, loi->loi_kms,
2453 lock->l_policy_data.l_extent.end);
2454 }
2455 ldlm_lock_allow_match(lock);
2456 } else if (rc == ELDLM_LOCK_ABORTED && (flags & LDLM_FL_HAS_INTENT)) {
2457 LASSERT(lock != NULL);
2458 loi->loi_lvb = *lvb;
2459 ldlm_lock_allow_match(lock);
2460 CDEBUG(D_INODE, "glimpsed, setting rss="LPU64"; leaving"
2461 " kms="LPU64"\n", loi->loi_lvb.lvb_size, loi->loi_kms);
2462 rc = ELDLM_OK;
2463 }
2464
2465 if (lock != NULL) {
2466 if (rc != ELDLM_OK)
2467 ldlm_lock_fail_match(lock);
2468
2469 LDLM_LOCK_PUT(lock);
2470 }
2471}
2472EXPORT_SYMBOL(osc_update_enqueue);
2473
2474struct ptlrpc_request_set *PTLRPCD_SET = (void *)1;
2475
2476/* When enqueuing asynchronously, locks are not ordered, we can obtain a lock
2477 * from the 2nd OSC before a lock from the 1st one. This does not deadlock with
2478 * other synchronous requests, however keeping some locks and trying to obtain
2479 * others may take a considerable amount of time in a case of ost failure; and
2480 * when other sync requests do not get released lock from a client, the client
2481 * is excluded from the cluster -- such scenarious make the life difficult, so
2482 * release locks just after they are obtained. */
2483int osc_enqueue_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2484 __u64 *flags, ldlm_policy_data_t *policy,
2485 struct ost_lvb *lvb, int kms_valid,
2486 obd_enqueue_update_f upcall, void *cookie,
2487 struct ldlm_enqueue_info *einfo,
2488 struct lustre_handle *lockh,
2489 struct ptlrpc_request_set *rqset, int async, int agl)
2490{
2491 struct obd_device *obd = exp->exp_obd;
2492 struct ptlrpc_request *req = NULL;
2493 int intent = *flags & LDLM_FL_HAS_INTENT;
2494 int match_lvb = (agl != 0 ? 0 : LDLM_FL_LVB_READY);
2495 ldlm_mode_t mode;
2496 int rc;
2497 ENTRY;
2498
2499 /* Filesystem lock extents are extended to page boundaries so that
2500 * dealing with the page cache is a little smoother. */
2501 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2502 policy->l_extent.end |= ~CFS_PAGE_MASK;
2503
2504 /*
2505 * kms is not valid when either object is completely fresh (so that no
2506 * locks are cached), or object was evicted. In the latter case cached
2507 * lock cannot be used, because it would prime inode state with
2508 * potentially stale LVB.
2509 */
2510 if (!kms_valid)
2511 goto no_match;
2512
2513 /* Next, search for already existing extent locks that will cover us */
2514 /* If we're trying to read, we also search for an existing PW lock. The
2515 * VFS and page cache already protect us locally, so lots of readers/
2516 * writers can share a single PW lock.
2517 *
2518 * There are problems with conversion deadlocks, so instead of
2519 * converting a read lock to a write lock, we'll just enqueue a new
2520 * one.
2521 *
2522 * At some point we should cancel the read lock instead of making them
2523 * send us a blocking callback, but there are problems with canceling
2524 * locks out from other users right now, too. */
2525 mode = einfo->ei_mode;
2526 if (einfo->ei_mode == LCK_PR)
2527 mode |= LCK_PW;
2528 mode = ldlm_lock_match(obd->obd_namespace, *flags | match_lvb, res_id,
2529 einfo->ei_type, policy, mode, lockh, 0);
2530 if (mode) {
2531 struct ldlm_lock *matched = ldlm_handle2lock(lockh);
2532
2533 if ((agl != 0) && !(matched->l_flags & LDLM_FL_LVB_READY)) {
2534 /* For AGL, if enqueue RPC is sent but the lock is not
2535 * granted, then skip to process this strpe.
2536 * Return -ECANCELED to tell the caller. */
2537 ldlm_lock_decref(lockh, mode);
2538 LDLM_LOCK_PUT(matched);
2539 RETURN(-ECANCELED);
2540 } else if (osc_set_lock_data_with_check(matched, einfo)) {
2541 *flags |= LDLM_FL_LVB_READY;
2542 /* addref the lock only if not async requests and PW
2543 * lock is matched whereas we asked for PR. */
2544 if (!rqset && einfo->ei_mode != mode)
2545 ldlm_lock_addref(lockh, LCK_PR);
2546 if (intent) {
2547 /* I would like to be able to ASSERT here that
2548 * rss <= kms, but I can't, for reasons which
2549 * are explained in lov_enqueue() */
2550 }
2551
2552 /* We already have a lock, and it's referenced.
2553 *
2554 * At this point, the cl_lock::cll_state is CLS_QUEUING,
2555 * AGL upcall may change it to CLS_HELD directly. */
2556 (*upcall)(cookie, ELDLM_OK);
2557
2558 if (einfo->ei_mode != mode)
2559 ldlm_lock_decref(lockh, LCK_PW);
2560 else if (rqset)
2561 /* For async requests, decref the lock. */
2562 ldlm_lock_decref(lockh, einfo->ei_mode);
2563 LDLM_LOCK_PUT(matched);
2564 RETURN(ELDLM_OK);
2565 } else {
2566 ldlm_lock_decref(lockh, mode);
2567 LDLM_LOCK_PUT(matched);
2568 }
2569 }
2570
2571 no_match:
2572 if (intent) {
2573 LIST_HEAD(cancels);
2574 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
2575 &RQF_LDLM_ENQUEUE_LVB);
2576 if (req == NULL)
2577 RETURN(-ENOMEM);
2578
2579 rc = ldlm_prep_enqueue_req(exp, req, &cancels, 0);
2580 if (rc) {
2581 ptlrpc_request_free(req);
2582 RETURN(rc);
2583 }
2584
2585 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER,
2586 sizeof *lvb);
2587 ptlrpc_request_set_replen(req);
2588 }
2589
2590 /* users of osc_enqueue() can pass this flag for ldlm_lock_match() */
2591 *flags &= ~LDLM_FL_BLOCK_GRANTED;
2592
2593 rc = ldlm_cli_enqueue(exp, &req, einfo, res_id, policy, flags, lvb,
2594 sizeof(*lvb), LVB_T_OST, lockh, async);
2595 if (rqset) {
2596 if (!rc) {
2597 struct osc_enqueue_args *aa;
2598 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2599 aa = ptlrpc_req_async_args(req);
2600 aa->oa_ei = einfo;
2601 aa->oa_exp = exp;
2602 aa->oa_flags = flags;
2603 aa->oa_upcall = upcall;
2604 aa->oa_cookie = cookie;
2605 aa->oa_lvb = lvb;
2606 aa->oa_lockh = lockh;
2607 aa->oa_agl = !!agl;
2608
2609 req->rq_interpret_reply =
2610 (ptlrpc_interpterer_t)osc_enqueue_interpret;
2611 if (rqset == PTLRPCD_SET)
2612 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
2613 else
2614 ptlrpc_set_add_req(rqset, req);
2615 } else if (intent) {
2616 ptlrpc_req_finished(req);
2617 }
2618 RETURN(rc);
2619 }
2620
2621 rc = osc_enqueue_fini(req, lvb, upcall, cookie, flags, agl, rc);
2622 if (intent)
2623 ptlrpc_req_finished(req);
2624
2625 RETURN(rc);
2626}
2627
2628static int osc_enqueue(struct obd_export *exp, struct obd_info *oinfo,
2629 struct ldlm_enqueue_info *einfo,
2630 struct ptlrpc_request_set *rqset)
2631{
2632 struct ldlm_res_id res_id;
2633 int rc;
2634 ENTRY;
2635
2636 ostid_build_res_name(&oinfo->oi_md->lsm_oi, &res_id);
2637 rc = osc_enqueue_base(exp, &res_id, &oinfo->oi_flags, &oinfo->oi_policy,
2638 &oinfo->oi_md->lsm_oinfo[0]->loi_lvb,
2639 oinfo->oi_md->lsm_oinfo[0]->loi_kms_valid,
2640 oinfo->oi_cb_up, oinfo, einfo, oinfo->oi_lockh,
2641 rqset, rqset != NULL, 0);
2642 RETURN(rc);
2643}
2644
2645int osc_match_base(struct obd_export *exp, struct ldlm_res_id *res_id,
2646 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
2647 int *flags, void *data, struct lustre_handle *lockh,
2648 int unref)
2649{
2650 struct obd_device *obd = exp->exp_obd;
2651 int lflags = *flags;
2652 ldlm_mode_t rc;
2653 ENTRY;
2654
2655 if (OBD_FAIL_CHECK(OBD_FAIL_OSC_MATCH))
2656 RETURN(-EIO);
2657
2658 /* Filesystem lock extents are extended to page boundaries so that
2659 * dealing with the page cache is a little smoother */
2660 policy->l_extent.start -= policy->l_extent.start & ~CFS_PAGE_MASK;
2661 policy->l_extent.end |= ~CFS_PAGE_MASK;
2662
2663 /* Next, search for already existing extent locks that will cover us */
2664 /* If we're trying to read, we also search for an existing PW lock. The
2665 * VFS and page cache already protect us locally, so lots of readers/
2666 * writers can share a single PW lock. */
2667 rc = mode;
2668 if (mode == LCK_PR)
2669 rc |= LCK_PW;
2670 rc = ldlm_lock_match(obd->obd_namespace, lflags,
2671 res_id, type, policy, rc, lockh, unref);
2672 if (rc) {
2673 if (data != NULL) {
2674 if (!osc_set_data_with_check(lockh, data)) {
2675 if (!(lflags & LDLM_FL_TEST_LOCK))
2676 ldlm_lock_decref(lockh, rc);
2677 RETURN(0);
2678 }
2679 }
2680 if (!(lflags & LDLM_FL_TEST_LOCK) && mode != rc) {
2681 ldlm_lock_addref(lockh, LCK_PR);
2682 ldlm_lock_decref(lockh, LCK_PW);
2683 }
2684 RETURN(rc);
2685 }
2686 RETURN(rc);
2687}
2688
2689int osc_cancel_base(struct lustre_handle *lockh, __u32 mode)
2690{
2691 ENTRY;
2692
2693 if (unlikely(mode == LCK_GROUP))
2694 ldlm_lock_decref_and_cancel(lockh, mode);
2695 else
2696 ldlm_lock_decref(lockh, mode);
2697
2698 RETURN(0);
2699}
2700
2701static int osc_cancel(struct obd_export *exp, struct lov_stripe_md *md,
2702 __u32 mode, struct lustre_handle *lockh)
2703{
2704 ENTRY;
2705 RETURN(osc_cancel_base(lockh, mode));
2706}
2707
2708static int osc_cancel_unused(struct obd_export *exp,
2709 struct lov_stripe_md *lsm,
2710 ldlm_cancel_flags_t flags,
2711 void *opaque)
2712{
2713 struct obd_device *obd = class_exp2obd(exp);
2714 struct ldlm_res_id res_id, *resp = NULL;
2715
2716 if (lsm != NULL) {
2717 ostid_build_res_name(&lsm->lsm_oi, &res_id);
2718 resp = &res_id;
2719 }
2720
2721 return ldlm_cli_cancel_unused(obd->obd_namespace, resp, flags, opaque);
2722}
2723
2724static int osc_statfs_interpret(const struct lu_env *env,
2725 struct ptlrpc_request *req,
2726 struct osc_async_args *aa, int rc)
2727{
2728 struct obd_statfs *msfs;
2729 ENTRY;
2730
2731 if (rc == -EBADR)
2732 /* The request has in fact never been sent
2733 * due to issues at a higher level (LOV).
2734 * Exit immediately since the caller is
2735 * aware of the problem and takes care
2736 * of the clean up */
2737 RETURN(rc);
2738
2739 if ((rc == -ENOTCONN || rc == -EAGAIN) &&
2740 (aa->aa_oi->oi_flags & OBD_STATFS_NODELAY))
2741 GOTO(out, rc = 0);
2742
2743 if (rc != 0)
2744 GOTO(out, rc);
2745
2746 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2747 if (msfs == NULL) {
2748 GOTO(out, rc = -EPROTO);
2749 }
2750
2751 *aa->aa_oi->oi_osfs = *msfs;
2752out:
2753 rc = aa->aa_oi->oi_cb_up(aa->aa_oi, rc);
2754 RETURN(rc);
2755}
2756
2757static int osc_statfs_async(struct obd_export *exp,
2758 struct obd_info *oinfo, __u64 max_age,
2759 struct ptlrpc_request_set *rqset)
2760{
2761 struct obd_device *obd = class_exp2obd(exp);
2762 struct ptlrpc_request *req;
2763 struct osc_async_args *aa;
2764 int rc;
2765 ENTRY;
2766
2767 /* We could possibly pass max_age in the request (as an absolute
2768 * timestamp or a "seconds.usec ago") so the target can avoid doing
2769 * extra calls into the filesystem if that isn't necessary (e.g.
2770 * during mount that would help a bit). Having relative timestamps
2771 * is not so great if request processing is slow, while absolute
2772 * timestamps are not ideal because they need time synchronization. */
2773 req = ptlrpc_request_alloc(obd->u.cli.cl_import, &RQF_OST_STATFS);
2774 if (req == NULL)
2775 RETURN(-ENOMEM);
2776
2777 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2778 if (rc) {
2779 ptlrpc_request_free(req);
2780 RETURN(rc);
2781 }
2782 ptlrpc_request_set_replen(req);
2783 req->rq_request_portal = OST_CREATE_PORTAL;
2784 ptlrpc_at_set_req_timeout(req);
2785
2786 if (oinfo->oi_flags & OBD_STATFS_NODELAY) {
2787 /* procfs requests not want stat in wait for avoid deadlock */
2788 req->rq_no_resend = 1;
2789 req->rq_no_delay = 1;
2790 }
2791
2792 req->rq_interpret_reply = (ptlrpc_interpterer_t)osc_statfs_interpret;
2793 CLASSERT (sizeof(*aa) <= sizeof(req->rq_async_args));
2794 aa = ptlrpc_req_async_args(req);
2795 aa->aa_oi = oinfo;
2796
2797 ptlrpc_set_add_req(rqset, req);
2798 RETURN(0);
2799}
2800
2801static int osc_statfs(const struct lu_env *env, struct obd_export *exp,
2802 struct obd_statfs *osfs, __u64 max_age, __u32 flags)
2803{
2804 struct obd_device *obd = class_exp2obd(exp);
2805 struct obd_statfs *msfs;
2806 struct ptlrpc_request *req;
2807 struct obd_import *imp = NULL;
2808 int rc;
2809 ENTRY;
2810
2811 /*Since the request might also come from lprocfs, so we need
2812 *sync this with client_disconnect_export Bug15684*/
2813 down_read(&obd->u.cli.cl_sem);
2814 if (obd->u.cli.cl_import)
2815 imp = class_import_get(obd->u.cli.cl_import);
2816 up_read(&obd->u.cli.cl_sem);
2817 if (!imp)
2818 RETURN(-ENODEV);
2819
2820 /* We could possibly pass max_age in the request (as an absolute
2821 * timestamp or a "seconds.usec ago") so the target can avoid doing
2822 * extra calls into the filesystem if that isn't necessary (e.g.
2823 * during mount that would help a bit). Having relative timestamps
2824 * is not so great if request processing is slow, while absolute
2825 * timestamps are not ideal because they need time synchronization. */
2826 req = ptlrpc_request_alloc(imp, &RQF_OST_STATFS);
2827
2828 class_import_put(imp);
2829
2830 if (req == NULL)
2831 RETURN(-ENOMEM);
2832
2833 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_STATFS);
2834 if (rc) {
2835 ptlrpc_request_free(req);
2836 RETURN(rc);
2837 }
2838 ptlrpc_request_set_replen(req);
2839 req->rq_request_portal = OST_CREATE_PORTAL;
2840 ptlrpc_at_set_req_timeout(req);
2841
2842 if (flags & OBD_STATFS_NODELAY) {
2843 /* procfs requests not want stat in wait for avoid deadlock */
2844 req->rq_no_resend = 1;
2845 req->rq_no_delay = 1;
2846 }
2847
2848 rc = ptlrpc_queue_wait(req);
2849 if (rc)
2850 GOTO(out, rc);
2851
2852 msfs = req_capsule_server_get(&req->rq_pill, &RMF_OBD_STATFS);
2853 if (msfs == NULL) {
2854 GOTO(out, rc = -EPROTO);
2855 }
2856
2857 *osfs = *msfs;
2858
2859 EXIT;
2860 out:
2861 ptlrpc_req_finished(req);
2862 return rc;
2863}
2864
2865/* Retrieve object striping information.
2866 *
2867 * @lmmu is a pointer to an in-core struct with lmm_ost_count indicating
2868 * the maximum number of OST indices which will fit in the user buffer.
2869 * lmm_magic must be LOV_MAGIC (we only use 1 slot here).
2870 */
2871static int osc_getstripe(struct lov_stripe_md *lsm, struct lov_user_md *lump)
2872{
2873 /* we use lov_user_md_v3 because it is larger than lov_user_md_v1 */
2874 struct lov_user_md_v3 lum, *lumk;
2875 struct lov_user_ost_data_v1 *lmm_objects;
2876 int rc = 0, lum_size;
2877 ENTRY;
2878
2879 if (!lsm)
2880 RETURN(-ENODATA);
2881
2882 /* we only need the header part from user space to get lmm_magic and
2883 * lmm_stripe_count, (the header part is common to v1 and v3) */
2884 lum_size = sizeof(struct lov_user_md_v1);
2885 if (copy_from_user(&lum, lump, lum_size))
2886 RETURN(-EFAULT);
2887
2888 if ((lum.lmm_magic != LOV_USER_MAGIC_V1) &&
2889 (lum.lmm_magic != LOV_USER_MAGIC_V3))
2890 RETURN(-EINVAL);
2891
2892 /* lov_user_md_vX and lov_mds_md_vX must have the same size */
2893 LASSERT(sizeof(struct lov_user_md_v1) == sizeof(struct lov_mds_md_v1));
2894 LASSERT(sizeof(struct lov_user_md_v3) == sizeof(struct lov_mds_md_v3));
2895 LASSERT(sizeof(lum.lmm_objects[0]) == sizeof(lumk->lmm_objects[0]));
2896
2897 /* we can use lov_mds_md_size() to compute lum_size
2898 * because lov_user_md_vX and lov_mds_md_vX have the same size */
2899 if (lum.lmm_stripe_count > 0) {
2900 lum_size = lov_mds_md_size(lum.lmm_stripe_count, lum.lmm_magic);
2901 OBD_ALLOC(lumk, lum_size);
2902 if (!lumk)
2903 RETURN(-ENOMEM);
2904
2905 if (lum.lmm_magic == LOV_USER_MAGIC_V1)
2906 lmm_objects =
2907 &(((struct lov_user_md_v1 *)lumk)->lmm_objects[0]);
2908 else
2909 lmm_objects = &(lumk->lmm_objects[0]);
2910 lmm_objects->l_ost_oi = lsm->lsm_oi;
2911 } else {
2912 lum_size = lov_mds_md_size(0, lum.lmm_magic);
2913 lumk = &lum;
2914 }
2915
2916 lumk->lmm_oi = lsm->lsm_oi;
2917 lumk->lmm_stripe_count = 1;
2918
2919 if (copy_to_user(lump, lumk, lum_size))
2920 rc = -EFAULT;
2921
2922 if (lumk != &lum)
2923 OBD_FREE(lumk, lum_size);
2924
2925 RETURN(rc);
2926}
2927
2928
2929static int osc_iocontrol(unsigned int cmd, struct obd_export *exp, int len,
2930 void *karg, void *uarg)
2931{
2932 struct obd_device *obd = exp->exp_obd;
2933 struct obd_ioctl_data *data = karg;
2934 int err = 0;
2935 ENTRY;
2936
2937 if (!try_module_get(THIS_MODULE)) {
2938 CERROR("Can't get module. Is it alive?");
2939 return -EINVAL;
2940 }
2941 switch (cmd) {
2942 case OBD_IOC_LOV_GET_CONFIG: {
2943 char *buf;
2944 struct lov_desc *desc;
2945 struct obd_uuid uuid;
2946
2947 buf = NULL;
2948 len = 0;
2949 if (obd_ioctl_getdata(&buf, &len, (void *)uarg))
2950 GOTO(out, err = -EINVAL);
2951
2952 data = (struct obd_ioctl_data *)buf;
2953
2954 if (sizeof(*desc) > data->ioc_inllen1) {
2955 obd_ioctl_freedata(buf, len);
2956 GOTO(out, err = -EINVAL);
2957 }
2958
2959 if (data->ioc_inllen2 < sizeof(uuid)) {
2960 obd_ioctl_freedata(buf, len);
2961 GOTO(out, err = -EINVAL);
2962 }
2963
2964 desc = (struct lov_desc *)data->ioc_inlbuf1;
2965 desc->ld_tgt_count = 1;
2966 desc->ld_active_tgt_count = 1;
2967 desc->ld_default_stripe_count = 1;
2968 desc->ld_default_stripe_size = 0;
2969 desc->ld_default_stripe_offset = 0;
2970 desc->ld_pattern = 0;
2971 memcpy(&desc->ld_uuid, &obd->obd_uuid, sizeof(uuid));
2972
2973 memcpy(data->ioc_inlbuf2, &obd->obd_uuid, sizeof(uuid));
2974
2975 err = copy_to_user((void *)uarg, buf, len);
2976 if (err)
2977 err = -EFAULT;
2978 obd_ioctl_freedata(buf, len);
2979 GOTO(out, err);
2980 }
2981 case LL_IOC_LOV_SETSTRIPE:
2982 err = obd_alloc_memmd(exp, karg);
2983 if (err > 0)
2984 err = 0;
2985 GOTO(out, err);
2986 case LL_IOC_LOV_GETSTRIPE:
2987 err = osc_getstripe(karg, uarg);
2988 GOTO(out, err);
2989 case OBD_IOC_CLIENT_RECOVER:
2990 err = ptlrpc_recover_import(obd->u.cli.cl_import,
2991 data->ioc_inlbuf1, 0);
2992 if (err > 0)
2993 err = 0;
2994 GOTO(out, err);
2995 case IOC_OSC_SET_ACTIVE:
2996 err = ptlrpc_set_import_active(obd->u.cli.cl_import,
2997 data->ioc_offset);
2998 GOTO(out, err);
2999 case OBD_IOC_POLL_QUOTACHECK:
3000 err = osc_quota_poll_check(exp, (struct if_quotacheck *)karg);
3001 GOTO(out, err);
3002 case OBD_IOC_PING_TARGET:
3003 err = ptlrpc_obd_ping(obd);
3004 GOTO(out, err);
3005 default:
3006 CDEBUG(D_INODE, "unrecognised ioctl %#x by %s\n",
3007 cmd, current_comm());
3008 GOTO(out, err = -ENOTTY);
3009 }
3010out:
3011 module_put(THIS_MODULE);
3012 return err;
3013}
3014
3015static int osc_get_info(const struct lu_env *env, struct obd_export *exp,
3016 obd_count keylen, void *key, __u32 *vallen, void *val,
3017 struct lov_stripe_md *lsm)
3018{
3019 ENTRY;
3020 if (!vallen || !val)
3021 RETURN(-EFAULT);
3022
3023 if (KEY_IS(KEY_LOCK_TO_STRIPE)) {
3024 __u32 *stripe = val;
3025 *vallen = sizeof(*stripe);
3026 *stripe = 0;
3027 RETURN(0);
3028 } else if (KEY_IS(KEY_LAST_ID)) {
3029 struct ptlrpc_request *req;
3030 obd_id *reply;
3031 char *tmp;
3032 int rc;
3033
3034 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3035 &RQF_OST_GET_INFO_LAST_ID);
3036 if (req == NULL)
3037 RETURN(-ENOMEM);
3038
3039 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3040 RCL_CLIENT, keylen);
3041 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3042 if (rc) {
3043 ptlrpc_request_free(req);
3044 RETURN(rc);
3045 }
3046
3047 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3048 memcpy(tmp, key, keylen);
3049
3050 req->rq_no_delay = req->rq_no_resend = 1;
3051 ptlrpc_request_set_replen(req);
3052 rc = ptlrpc_queue_wait(req);
3053 if (rc)
3054 GOTO(out, rc);
3055
3056 reply = req_capsule_server_get(&req->rq_pill, &RMF_OBD_ID);
3057 if (reply == NULL)
3058 GOTO(out, rc = -EPROTO);
3059
3060 *((obd_id *)val) = *reply;
3061 out:
3062 ptlrpc_req_finished(req);
3063 RETURN(rc);
3064 } else if (KEY_IS(KEY_FIEMAP)) {
9d865439
AB
3065 struct ll_fiemap_info_key *fm_key =
3066 (struct ll_fiemap_info_key *)key;
3067 struct ldlm_res_id res_id;
3068 ldlm_policy_data_t policy;
3069 struct lustre_handle lockh;
3070 ldlm_mode_t mode = 0;
3071 struct ptlrpc_request *req;
3072 struct ll_user_fiemap *reply;
3073 char *tmp;
3074 int rc;
3075
3076 if (!(fm_key->fiemap.fm_flags & FIEMAP_FLAG_SYNC))
3077 goto skip_locking;
3078
3079 policy.l_extent.start = fm_key->fiemap.fm_start &
3080 CFS_PAGE_MASK;
3081
3082 if (OBD_OBJECT_EOF - fm_key->fiemap.fm_length <=
3083 fm_key->fiemap.fm_start + PAGE_CACHE_SIZE - 1)
3084 policy.l_extent.end = OBD_OBJECT_EOF;
3085 else
3086 policy.l_extent.end = (fm_key->fiemap.fm_start +
3087 fm_key->fiemap.fm_length +
3088 PAGE_CACHE_SIZE - 1) & CFS_PAGE_MASK;
3089
3090 ostid_build_res_name(&fm_key->oa.o_oi, &res_id);
3091 mode = ldlm_lock_match(exp->exp_obd->obd_namespace,
3092 LDLM_FL_BLOCK_GRANTED |
3093 LDLM_FL_LVB_READY,
3094 &res_id, LDLM_EXTENT, &policy,
3095 LCK_PR | LCK_PW, &lockh, 0);
3096 if (mode) { /* lock is cached on client */
3097 if (mode != LCK_PR) {
3098 ldlm_lock_addref(&lockh, LCK_PR);
3099 ldlm_lock_decref(&lockh, LCK_PW);
3100 }
3101 } else { /* no cached lock, needs acquire lock on server side */
3102 fm_key->oa.o_valid |= OBD_MD_FLFLAGS;
3103 fm_key->oa.o_flags |= OBD_FL_SRVLOCK;
3104 }
d7e09d03 3105
9d865439 3106skip_locking:
d7e09d03
PT
3107 req = ptlrpc_request_alloc(class_exp2cliimp(exp),
3108 &RQF_OST_GET_INFO_FIEMAP);
3109 if (req == NULL)
9d865439 3110 GOTO(drop_lock, rc = -ENOMEM);
d7e09d03
PT
3111
3112 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_KEY,
3113 RCL_CLIENT, keylen);
3114 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3115 RCL_CLIENT, *vallen);
3116 req_capsule_set_size(&req->rq_pill, &RMF_FIEMAP_VAL,
3117 RCL_SERVER, *vallen);
3118
3119 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_GET_INFO);
3120 if (rc) {
3121 ptlrpc_request_free(req);
9d865439 3122 GOTO(drop_lock, rc);
d7e09d03
PT
3123 }
3124
3125 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_KEY);
3126 memcpy(tmp, key, keylen);
3127 tmp = req_capsule_client_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3128 memcpy(tmp, val, *vallen);
3129
3130 ptlrpc_request_set_replen(req);
3131 rc = ptlrpc_queue_wait(req);
3132 if (rc)
9d865439 3133 GOTO(fini_req, rc);
d7e09d03
PT
3134
3135 reply = req_capsule_server_get(&req->rq_pill, &RMF_FIEMAP_VAL);
3136 if (reply == NULL)
9d865439 3137 GOTO(fini_req, rc = -EPROTO);
d7e09d03
PT
3138
3139 memcpy(val, reply, *vallen);
9d865439 3140fini_req:
d7e09d03 3141 ptlrpc_req_finished(req);
9d865439
AB
3142drop_lock:
3143 if (mode)
3144 ldlm_lock_decref(&lockh, LCK_PR);
d7e09d03
PT
3145 RETURN(rc);
3146 }
3147
3148 RETURN(-EINVAL);
3149}
3150
3151static int osc_set_info_async(const struct lu_env *env, struct obd_export *exp,
3152 obd_count keylen, void *key, obd_count vallen,
3153 void *val, struct ptlrpc_request_set *set)
3154{
3155 struct ptlrpc_request *req;
3156 struct obd_device *obd = exp->exp_obd;
3157 struct obd_import *imp = class_exp2cliimp(exp);
3158 char *tmp;
3159 int rc;
3160 ENTRY;
3161
3162 OBD_FAIL_TIMEOUT(OBD_FAIL_OSC_SHUTDOWN, 10);
3163
3164 if (KEY_IS(KEY_CHECKSUM)) {
3165 if (vallen != sizeof(int))
3166 RETURN(-EINVAL);
3167 exp->exp_obd->u.cli.cl_checksum = (*(int *)val) ? 1 : 0;
3168 RETURN(0);
3169 }
3170
3171 if (KEY_IS(KEY_SPTLRPC_CONF)) {
3172 sptlrpc_conf_client_adapt(obd);
3173 RETURN(0);
3174 }
3175
3176 if (KEY_IS(KEY_FLUSH_CTX)) {
3177 sptlrpc_import_flush_my_ctx(imp);
3178 RETURN(0);
3179 }
3180
3181 if (KEY_IS(KEY_CACHE_SET)) {
3182 struct client_obd *cli = &obd->u.cli;
3183
3184 LASSERT(cli->cl_cache == NULL); /* only once */
3185 cli->cl_cache = (struct cl_client_cache *)val;
3186 atomic_inc(&cli->cl_cache->ccc_users);
3187 cli->cl_lru_left = &cli->cl_cache->ccc_lru_left;
3188
3189 /* add this osc into entity list */
3190 LASSERT(list_empty(&cli->cl_lru_osc));
3191 spin_lock(&cli->cl_cache->ccc_lru_lock);
3192 list_add(&cli->cl_lru_osc, &cli->cl_cache->ccc_lru);
3193 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3194
3195 RETURN(0);
3196 }
3197
3198 if (KEY_IS(KEY_CACHE_LRU_SHRINK)) {
3199 struct client_obd *cli = &obd->u.cli;
3200 int nr = atomic_read(&cli->cl_lru_in_list) >> 1;
3201 int target = *(int *)val;
3202
3203 nr = osc_lru_shrink(cli, min(nr, target));
3204 *(int *)val -= nr;
3205 RETURN(0);
3206 }
3207
3208 if (!set && !KEY_IS(KEY_GRANT_SHRINK))
3209 RETURN(-EINVAL);
3210
3211 /* We pass all other commands directly to OST. Since nobody calls osc
3212 methods directly and everybody is supposed to go through LOV, we
3213 assume lov checked invalid values for us.
3214 The only recognised values so far are evict_by_nid and mds_conn.
3215 Even if something bad goes through, we'd get a -EINVAL from OST
3216 anyway. */
3217
3218 req = ptlrpc_request_alloc(imp, KEY_IS(KEY_GRANT_SHRINK) ?
3219 &RQF_OST_SET_GRANT_INFO :
3220 &RQF_OBD_SET_INFO);
3221 if (req == NULL)
3222 RETURN(-ENOMEM);
3223
3224 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_KEY,
3225 RCL_CLIENT, keylen);
3226 if (!KEY_IS(KEY_GRANT_SHRINK))
3227 req_capsule_set_size(&req->rq_pill, &RMF_SETINFO_VAL,
3228 RCL_CLIENT, vallen);
3229 rc = ptlrpc_request_pack(req, LUSTRE_OST_VERSION, OST_SET_INFO);
3230 if (rc) {
3231 ptlrpc_request_free(req);
3232 RETURN(rc);
3233 }
3234
3235 tmp = req_capsule_client_get(&req->rq_pill, &RMF_SETINFO_KEY);
3236 memcpy(tmp, key, keylen);
3237 tmp = req_capsule_client_get(&req->rq_pill, KEY_IS(KEY_GRANT_SHRINK) ?
3238 &RMF_OST_BODY :
3239 &RMF_SETINFO_VAL);
3240 memcpy(tmp, val, vallen);
3241
3242 if (KEY_IS(KEY_GRANT_SHRINK)) {
3243 struct osc_grant_args *aa;
3244 struct obdo *oa;
3245
3246 CLASSERT(sizeof(*aa) <= sizeof(req->rq_async_args));
3247 aa = ptlrpc_req_async_args(req);
3248 OBDO_ALLOC(oa);
3249 if (!oa) {
3250 ptlrpc_req_finished(req);
3251 RETURN(-ENOMEM);
3252 }
3253 *oa = ((struct ost_body *)val)->oa;
3254 aa->aa_oa = oa;
3255 req->rq_interpret_reply = osc_shrink_grant_interpret;
3256 }
3257
3258 ptlrpc_request_set_replen(req);
3259 if (!KEY_IS(KEY_GRANT_SHRINK)) {
3260 LASSERT(set != NULL);
3261 ptlrpc_set_add_req(set, req);
3262 ptlrpc_check_set(NULL, set);
3263 } else
3264 ptlrpcd_add_req(req, PDL_POLICY_ROUND, -1);
3265
3266 RETURN(0);
3267}
3268
3269
3270static int osc_llog_init(struct obd_device *obd, struct obd_llog_group *olg,
3271 struct obd_device *disk_obd, int *index)
3272{
3273 /* this code is not supposed to be used with LOD/OSP
3274 * to be removed soon */
3275 LBUG();
3276 return 0;
3277}
3278
3279static int osc_llog_finish(struct obd_device *obd, int count)
3280{
3281 struct llog_ctxt *ctxt;
3282
3283 ENTRY;
3284
3285 ctxt = llog_get_context(obd, LLOG_MDS_OST_ORIG_CTXT);
3286 if (ctxt) {
3287 llog_cat_close(NULL, ctxt->loc_handle);
3288 llog_cleanup(NULL, ctxt);
3289 }
3290
3291 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3292 if (ctxt)
3293 llog_cleanup(NULL, ctxt);
3294 RETURN(0);
3295}
3296
3297static int osc_reconnect(const struct lu_env *env,
3298 struct obd_export *exp, struct obd_device *obd,
3299 struct obd_uuid *cluuid,
3300 struct obd_connect_data *data,
3301 void *localdata)
3302{
3303 struct client_obd *cli = &obd->u.cli;
3304
3305 if (data != NULL && (data->ocd_connect_flags & OBD_CONNECT_GRANT)) {
3306 long lost_grant;
3307
3308 client_obd_list_lock(&cli->cl_loi_list_lock);
3309 data->ocd_grant = (cli->cl_avail_grant + cli->cl_dirty) ?:
3310 2 * cli_brw_size(obd);
3311 lost_grant = cli->cl_lost_grant;
3312 cli->cl_lost_grant = 0;
3313 client_obd_list_unlock(&cli->cl_loi_list_lock);
3314
3315 CDEBUG(D_RPCTRACE, "ocd_connect_flags: "LPX64" ocd_version: %d"
3316 " ocd_grant: %d, lost: %ld.\n", data->ocd_connect_flags,
3317 data->ocd_version, data->ocd_grant, lost_grant);
3318 }
3319
3320 RETURN(0);
3321}
3322
3323static int osc_disconnect(struct obd_export *exp)
3324{
3325 struct obd_device *obd = class_exp2obd(exp);
3326 struct llog_ctxt *ctxt;
3327 int rc;
3328
3329 ctxt = llog_get_context(obd, LLOG_SIZE_REPL_CTXT);
3330 if (ctxt) {
3331 if (obd->u.cli.cl_conn_count == 1) {
3332 /* Flush any remaining cancel messages out to the
3333 * target */
3334 llog_sync(ctxt, exp, 0);
3335 }
3336 llog_ctxt_put(ctxt);
3337 } else {
3338 CDEBUG(D_HA, "No LLOG_SIZE_REPL_CTXT found in obd %p\n",
3339 obd);
3340 }
3341
3342 rc = client_disconnect_export(exp);
3343 /**
3344 * Initially we put del_shrink_grant before disconnect_export, but it
3345 * causes the following problem if setup (connect) and cleanup
3346 * (disconnect) are tangled together.
3347 * connect p1 disconnect p2
3348 * ptlrpc_connect_import
3349 * ............... class_manual_cleanup
3350 * osc_disconnect
3351 * del_shrink_grant
3352 * ptlrpc_connect_interrupt
3353 * init_grant_shrink
3354 * add this client to shrink list
3355 * cleanup_osc
3356 * Bang! pinger trigger the shrink.
3357 * So the osc should be disconnected from the shrink list, after we
3358 * are sure the import has been destroyed. BUG18662
3359 */
3360 if (obd->u.cli.cl_import == NULL)
3361 osc_del_shrink_grant(&obd->u.cli);
3362 return rc;
3363}
3364
3365static int osc_import_event(struct obd_device *obd,
3366 struct obd_import *imp,
3367 enum obd_import_event event)
3368{
3369 struct client_obd *cli;
3370 int rc = 0;
3371
3372 ENTRY;
3373 LASSERT(imp->imp_obd == obd);
3374
3375 switch (event) {
3376 case IMP_EVENT_DISCON: {
3377 cli = &obd->u.cli;
3378 client_obd_list_lock(&cli->cl_loi_list_lock);
3379 cli->cl_avail_grant = 0;
3380 cli->cl_lost_grant = 0;
3381 client_obd_list_unlock(&cli->cl_loi_list_lock);
3382 break;
3383 }
3384 case IMP_EVENT_INACTIVE: {
3385 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_INACTIVE, NULL);
3386 break;
3387 }
3388 case IMP_EVENT_INVALIDATE: {
3389 struct ldlm_namespace *ns = obd->obd_namespace;
3390 struct lu_env *env;
3391 int refcheck;
3392
3393 env = cl_env_get(&refcheck);
3394 if (!IS_ERR(env)) {
3395 /* Reset grants */
3396 cli = &obd->u.cli;
3397 /* all pages go to failing rpcs due to the invalid
3398 * import */
3399 osc_io_unplug(env, cli, NULL, PDL_POLICY_ROUND);
3400
3401 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
3402 cl_env_put(env, &refcheck);
3403 } else
3404 rc = PTR_ERR(env);
3405 break;
3406 }
3407 case IMP_EVENT_ACTIVE: {
3408 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVE, NULL);
3409 break;
3410 }
3411 case IMP_EVENT_OCD: {
3412 struct obd_connect_data *ocd = &imp->imp_connect_data;
3413
3414 if (ocd->ocd_connect_flags & OBD_CONNECT_GRANT)
3415 osc_init_grant(&obd->u.cli, ocd);
3416
3417 /* See bug 7198 */
3418 if (ocd->ocd_connect_flags & OBD_CONNECT_REQPORTAL)
3419 imp->imp_client->cli_request_portal =OST_REQUEST_PORTAL;
3420
3421 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_OCD, NULL);
3422 break;
3423 }
3424 case IMP_EVENT_DEACTIVATE: {
3425 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_DEACTIVATE, NULL);
3426 break;
3427 }
3428 case IMP_EVENT_ACTIVATE: {
3429 rc = obd_notify_observer(obd, obd, OBD_NOTIFY_ACTIVATE, NULL);
3430 break;
3431 }
3432 default:
3433 CERROR("Unknown import event %d\n", event);
3434 LBUG();
3435 }
3436 RETURN(rc);
3437}
3438
3439/**
3440 * Determine whether the lock can be canceled before replaying the lock
3441 * during recovery, see bug16774 for detailed information.
3442 *
3443 * \retval zero the lock can't be canceled
3444 * \retval other ok to cancel
3445 */
3446static int osc_cancel_for_recovery(struct ldlm_lock *lock)
3447{
3448 check_res_locked(lock->l_resource);
3449
3450 /*
3451 * Cancel all unused extent lock in granted mode LCK_PR or LCK_CR.
3452 *
3453 * XXX as a future improvement, we can also cancel unused write lock
3454 * if it doesn't have dirty data and active mmaps.
3455 */
3456 if (lock->l_resource->lr_type == LDLM_EXTENT &&
3457 (lock->l_granted_mode == LCK_PR ||
3458 lock->l_granted_mode == LCK_CR) &&
3459 (osc_dlm_lock_pageref(lock) == 0))
3460 RETURN(1);
3461
3462 RETURN(0);
3463}
3464
3465static int brw_queue_work(const struct lu_env *env, void *data)
3466{
3467 struct client_obd *cli = data;
3468
3469 CDEBUG(D_CACHE, "Run writeback work for client obd %p.\n", cli);
3470
3471 osc_io_unplug(env, cli, NULL, PDL_POLICY_SAME);
3472 RETURN(0);
3473}
3474
3475int osc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
3476{
3477 struct lprocfs_static_vars lvars = { 0 };
3478 struct client_obd *cli = &obd->u.cli;
3479 void *handler;
3480 int rc;
3481 ENTRY;
3482
3483 rc = ptlrpcd_addref();
3484 if (rc)
3485 RETURN(rc);
3486
3487 rc = client_obd_setup(obd, lcfg);
3488 if (rc)
3489 GOTO(out_ptlrpcd, rc);
3490
3491 handler = ptlrpcd_alloc_work(cli->cl_import, brw_queue_work, cli);
3492 if (IS_ERR(handler))
3493 GOTO(out_client_setup, rc = PTR_ERR(handler));
3494 cli->cl_writeback_work = handler;
3495
3496 rc = osc_quota_setup(obd);
3497 if (rc)
3498 GOTO(out_ptlrpcd_work, rc);
3499
3500 cli->cl_grant_shrink_interval = GRANT_SHRINK_INTERVAL;
3501 lprocfs_osc_init_vars(&lvars);
3502 if (lprocfs_obd_setup(obd, lvars.obd_vars) == 0) {
3503 lproc_osc_attach_seqstat(obd);
3504 sptlrpc_lprocfs_cliobd_attach(obd);
3505 ptlrpc_lprocfs_register_obd(obd);
3506 }
3507
3508 /* We need to allocate a few requests more, because
3509 * brw_interpret tries to create new requests before freeing
3510 * previous ones, Ideally we want to have 2x max_rpcs_in_flight
3511 * reserved, but I'm afraid that might be too much wasted RAM
3512 * in fact, so 2 is just my guess and still should work. */
3513 cli->cl_import->imp_rq_pool =
3514 ptlrpc_init_rq_pool(cli->cl_max_rpcs_in_flight + 2,
3515 OST_MAXREQSIZE,
3516 ptlrpc_add_rqs_to_pool);
3517
3518 INIT_LIST_HEAD(&cli->cl_grant_shrink_list);
3519 ns_register_cancel(obd->obd_namespace, osc_cancel_for_recovery);
3520 RETURN(rc);
3521
3522out_ptlrpcd_work:
3523 ptlrpcd_destroy_work(handler);
3524out_client_setup:
3525 client_obd_cleanup(obd);
3526out_ptlrpcd:
3527 ptlrpcd_decref();
3528 RETURN(rc);
3529}
3530
3531static int osc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
3532{
3533 int rc = 0;
3534 ENTRY;
3535
3536 switch (stage) {
3537 case OBD_CLEANUP_EARLY: {
3538 struct obd_import *imp;
3539 imp = obd->u.cli.cl_import;
3540 CDEBUG(D_HA, "Deactivating import %s\n", obd->obd_name);
3541 /* ptlrpc_abort_inflight to stop an mds_lov_synchronize */
3542 ptlrpc_deactivate_import(imp);
3543 spin_lock(&imp->imp_lock);
3544 imp->imp_pingable = 0;
3545 spin_unlock(&imp->imp_lock);
3546 break;
3547 }
3548 case OBD_CLEANUP_EXPORTS: {
3549 struct client_obd *cli = &obd->u.cli;
3550 /* LU-464
3551 * for echo client, export may be on zombie list, wait for
3552 * zombie thread to cull it, because cli.cl_import will be
3553 * cleared in client_disconnect_export():
3554 * class_export_destroy() -> obd_cleanup() ->
3555 * echo_device_free() -> echo_client_cleanup() ->
3556 * obd_disconnect() -> osc_disconnect() ->
3557 * client_disconnect_export()
3558 */
3559 obd_zombie_barrier();
3560 if (cli->cl_writeback_work) {
3561 ptlrpcd_destroy_work(cli->cl_writeback_work);
3562 cli->cl_writeback_work = NULL;
3563 }
3564 obd_cleanup_client_import(obd);
3565 ptlrpc_lprocfs_unregister_obd(obd);
3566 lprocfs_obd_cleanup(obd);
3567 rc = obd_llog_finish(obd, 0);
3568 if (rc != 0)
3569 CERROR("failed to cleanup llogging subsystems\n");
3570 break;
3571 }
3572 }
3573 RETURN(rc);
3574}
3575
3576int osc_cleanup(struct obd_device *obd)
3577{
3578 struct client_obd *cli = &obd->u.cli;
3579 int rc;
3580
3581 ENTRY;
3582
3583 /* lru cleanup */
3584 if (cli->cl_cache != NULL) {
3585 LASSERT(atomic_read(&cli->cl_cache->ccc_users) > 0);
3586 spin_lock(&cli->cl_cache->ccc_lru_lock);
3587 list_del_init(&cli->cl_lru_osc);
3588 spin_unlock(&cli->cl_cache->ccc_lru_lock);
3589 cli->cl_lru_left = NULL;
3590 atomic_dec(&cli->cl_cache->ccc_users);
3591 cli->cl_cache = NULL;
3592 }
3593
3594 /* free memory of osc quota cache */
3595 osc_quota_cleanup(obd);
3596
3597 rc = client_obd_cleanup(obd);
3598
3599 ptlrpcd_decref();
3600 RETURN(rc);
3601}
3602
3603int osc_process_config_base(struct obd_device *obd, struct lustre_cfg *lcfg)
3604{
3605 struct lprocfs_static_vars lvars = { 0 };
3606 int rc = 0;
3607
3608 lprocfs_osc_init_vars(&lvars);
3609
3610 switch (lcfg->lcfg_command) {
3611 default:
3612 rc = class_process_proc_param(PARAM_OSC, lvars.obd_vars,
3613 lcfg, obd);
3614 if (rc > 0)
3615 rc = 0;
3616 break;
3617 }
3618
3619 return(rc);
3620}
3621
3622static int osc_process_config(struct obd_device *obd, obd_count len, void *buf)
3623{
3624 return osc_process_config_base(obd, buf);
3625}
3626
3627struct obd_ops osc_obd_ops = {
3628 .o_owner = THIS_MODULE,
3629 .o_setup = osc_setup,
3630 .o_precleanup = osc_precleanup,
3631 .o_cleanup = osc_cleanup,
3632 .o_add_conn = client_import_add_conn,
3633 .o_del_conn = client_import_del_conn,
3634 .o_connect = client_connect_import,
3635 .o_reconnect = osc_reconnect,
3636 .o_disconnect = osc_disconnect,
3637 .o_statfs = osc_statfs,
3638 .o_statfs_async = osc_statfs_async,
3639 .o_packmd = osc_packmd,
3640 .o_unpackmd = osc_unpackmd,
3641 .o_create = osc_create,
3642 .o_destroy = osc_destroy,
3643 .o_getattr = osc_getattr,
3644 .o_getattr_async = osc_getattr_async,
3645 .o_setattr = osc_setattr,
3646 .o_setattr_async = osc_setattr_async,
3647 .o_brw = osc_brw,
3648 .o_punch = osc_punch,
3649 .o_sync = osc_sync,
3650 .o_enqueue = osc_enqueue,
3651 .o_change_cbdata = osc_change_cbdata,
3652 .o_find_cbdata = osc_find_cbdata,
3653 .o_cancel = osc_cancel,
3654 .o_cancel_unused = osc_cancel_unused,
3655 .o_iocontrol = osc_iocontrol,
3656 .o_get_info = osc_get_info,
3657 .o_set_info_async = osc_set_info_async,
3658 .o_import_event = osc_import_event,
3659 .o_llog_init = osc_llog_init,
3660 .o_llog_finish = osc_llog_finish,
3661 .o_process_config = osc_process_config,
3662 .o_quotactl = osc_quotactl,
3663 .o_quotacheck = osc_quotacheck,
3664};
3665
3666extern struct lu_kmem_descr osc_caches[];
3667extern spinlock_t osc_ast_guard;
3668extern struct lock_class_key osc_ast_guard_class;
3669
3670int __init osc_init(void)
3671{
3672 struct lprocfs_static_vars lvars = { 0 };
3673 int rc;
3674 ENTRY;
3675
3676 /* print an address of _any_ initialized kernel symbol from this
3677 * module, to allow debugging with gdb that doesn't support data
3678 * symbols from modules.*/
3679 CDEBUG(D_INFO, "Lustre OSC module (%p).\n", &osc_caches);
3680
3681 rc = lu_kmem_init(osc_caches);
3682
3683 lprocfs_osc_init_vars(&lvars);
3684
3685 rc = class_register_type(&osc_obd_ops, NULL, lvars.module_vars,
3686 LUSTRE_OSC_NAME, &osc_device_type);
3687 if (rc) {
3688 lu_kmem_fini(osc_caches);
3689 RETURN(rc);
3690 }
3691
3692 spin_lock_init(&osc_ast_guard);
3693 lockdep_set_class(&osc_ast_guard, &osc_ast_guard_class);
3694
3695 RETURN(rc);
3696}
3697
3698static void /*__exit*/ osc_exit(void)
3699{
3700 class_unregister_type(LUSTRE_OSC_NAME);
3701 lu_kmem_fini(osc_caches);
3702}
3703
3704MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
3705MODULE_DESCRIPTION("Lustre Object Storage Client (OSC)");
3706MODULE_LICENSE("GPL");
3707
3708cfs_module(osc, LUSTRE_VERSION_STRING, osc_init, osc_exit);