Merge branch 'pm-cpufreq'
[linux-2.6-block.git] / drivers / staging / lustre / lustre / mgc / mgc_request.c
CommitLineData
d7e09d03
PT
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2007, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * lustre/mgc/mgc_request.c
37 *
38 * Author: Nathan Rutman <nathan@clusterfs.com>
39 */
40
41#define DEBUG_SUBSYSTEM S_MGC
42#define D_MGC D_CONFIG /*|D_WARNING*/
43
aa4e3c8a 44#include <linux/module.h>
73060ed9
GKH
45#include "../include/obd_class.h"
46#include "../include/lustre_dlm.h"
47#include "../include/lprocfs_status.h"
48#include "../include/lustre_log.h"
49#include "../include/lustre_disk.h"
aa4e3c8a 50
d7e09d03
PT
51#include "mgc_internal.h"
52
53static int mgc_name2resid(char *name, int len, struct ldlm_res_id *res_id,
54 int type)
55{
56 __u64 resname = 0;
57
7d4bae45 58 if (len > sizeof(resname)) {
d7e09d03
PT
59 CERROR("name too long: %s\n", name);
60 return -EINVAL;
61 }
62 if (len <= 0) {
63 CERROR("missing name: %s\n", name);
64 return -EINVAL;
65 }
66 memcpy(&resname, name, len);
67
68 /* Always use the same endianness for the resid */
69 memset(res_id, 0, sizeof(*res_id));
70 res_id->name[0] = cpu_to_le64(resname);
71 /* XXX: unfortunately, sptlprc and config llog share one lock */
37821997 72 switch (type) {
d7e09d03
PT
73 case CONFIG_T_CONFIG:
74 case CONFIG_T_SPTLRPC:
75 resname = 0;
76 break;
77 case CONFIG_T_RECOVER:
7d4bae45 78 case CONFIG_T_PARAMS:
d7e09d03
PT
79 resname = type;
80 break;
81 default:
82 LBUG();
83 }
84 res_id->name[1] = cpu_to_le64(resname);
55f5a824 85 CDEBUG(D_MGC, "log %s to resid %#llx/%#llx (%.8s)\n", name,
d7e09d03
PT
86 res_id->name[0], res_id->name[1], (char *)&res_id->name[0]);
87 return 0;
88}
89
90int mgc_fsname2resid(char *fsname, struct ldlm_res_id *res_id, int type)
91{
92 /* fsname is at most 8 chars long, maybe contain "-".
93 * e.g. "lustre", "SUN-000" */
94 return mgc_name2resid(fsname, strlen(fsname), res_id, type);
95}
96EXPORT_SYMBOL(mgc_fsname2resid);
97
0bf31f07 98static int mgc_logname2resid(char *logname, struct ldlm_res_id *res_id, int type)
d7e09d03
PT
99{
100 char *name_end;
101 int len;
102
103 /* logname consists of "fsname-nodetype".
7d4bae45
AB
104 * e.g. "lustre-MDT0001", "SUN-000-client"
105 * there is an exception: llog "params" */
d7e09d03 106 name_end = strrchr(logname, '-');
7d4bae45
AB
107 if (!name_end)
108 len = strlen(logname);
109 else
110 len = name_end - logname;
d7e09d03
PT
111 return mgc_name2resid(logname, len, res_id, type);
112}
113
114/********************** config llog list **********************/
115static LIST_HEAD(config_llog_list);
116static DEFINE_SPINLOCK(config_list_lock);
117
118/* Take a reference to a config log */
119static int config_log_get(struct config_llog_data *cld)
120{
d7e09d03
PT
121 atomic_inc(&cld->cld_refcount);
122 CDEBUG(D_INFO, "log %s refs %d\n", cld->cld_logname,
123 atomic_read(&cld->cld_refcount));
0a3bdb00 124 return 0;
d7e09d03
PT
125}
126
127/* Drop a reference to a config log. When no longer referenced,
128 we can free the config log data */
129static void config_log_put(struct config_llog_data *cld)
130{
d7e09d03
PT
131 CDEBUG(D_INFO, "log %s refs %d\n", cld->cld_logname,
132 atomic_read(&cld->cld_refcount));
133 LASSERT(atomic_read(&cld->cld_refcount) > 0);
134
135 /* spinlock to make sure no item with 0 refcount in the list */
136 if (atomic_dec_and_lock(&cld->cld_refcount, &config_list_lock)) {
137 list_del(&cld->cld_list_chain);
138 spin_unlock(&config_list_lock);
139
140 CDEBUG(D_MGC, "dropping config log %s\n", cld->cld_logname);
141
142 if (cld->cld_recover)
143 config_log_put(cld->cld_recover);
144 if (cld->cld_sptlrpc)
145 config_log_put(cld->cld_sptlrpc);
7d4bae45
AB
146 if (cld->cld_params)
147 config_log_put(cld->cld_params);
d7e09d03
PT
148 if (cld_is_sptlrpc(cld))
149 sptlrpc_conf_log_stop(cld->cld_logname);
150
151 class_export_put(cld->cld_mgcexp);
c9b4297f 152 kfree(cld);
d7e09d03 153 }
d7e09d03
PT
154}
155
156/* Find a config log by name */
157static
158struct config_llog_data *config_log_find(char *logname,
159 struct config_llog_instance *cfg)
160{
161 struct config_llog_data *cld;
162 struct config_llog_data *found = NULL;
17fc7f9b 163 void *instance;
d7e09d03
PT
164
165 LASSERT(logname != NULL);
166
167 instance = cfg ? cfg->cfg_instance : NULL;
168 spin_lock(&config_list_lock);
169 list_for_each_entry(cld, &config_llog_list, cld_list_chain) {
170 /* check if instance equals */
171 if (instance != cld->cld_cfg.cfg_instance)
172 continue;
173
174 /* instance may be NULL, should check name */
175 if (strcmp(logname, cld->cld_logname) == 0) {
176 found = cld;
177 break;
178 }
179 }
180 if (found) {
181 atomic_inc(&found->cld_refcount);
182 LASSERT(found->cld_stopping == 0 || cld_is_sptlrpc(found) == 0);
183 }
184 spin_unlock(&config_list_lock);
0a3bdb00 185 return found;
d7e09d03
PT
186}
187
188static
189struct config_llog_data *do_config_log_add(struct obd_device *obd,
190 char *logname,
191 int type,
192 struct config_llog_instance *cfg,
193 struct super_block *sb)
194{
195 struct config_llog_data *cld;
196 int rc;
d7e09d03
PT
197
198 CDEBUG(D_MGC, "do adding config log %s:%p\n", logname,
ea7893bb 199 cfg ? cfg->cfg_instance : NULL);
d7e09d03 200
c9b4297f 201 cld = kzalloc(sizeof(*cld) + strlen(logname) + 1, GFP_NOFS);
d7e09d03 202 if (!cld)
0a3bdb00 203 return ERR_PTR(-ENOMEM);
d7e09d03
PT
204
205 strcpy(cld->cld_logname, logname);
206 if (cfg)
207 cld->cld_cfg = *cfg;
208 else
209 cld->cld_cfg.cfg_callback = class_config_llog_handler;
210 mutex_init(&cld->cld_lock);
211 cld->cld_cfg.cfg_last_idx = 0;
212 cld->cld_cfg.cfg_flags = 0;
213 cld->cld_cfg.cfg_sb = sb;
214 cld->cld_type = type;
215 atomic_set(&cld->cld_refcount, 1);
216
217 /* Keep the mgc around until we are done */
218 cld->cld_mgcexp = class_export_get(obd->obd_self_export);
219
220 if (cld_is_sptlrpc(cld)) {
221 sptlrpc_conf_log_start(logname);
222 cld->cld_cfg.cfg_obdname = obd->obd_name;
223 }
224
225 rc = mgc_logname2resid(logname, &cld->cld_resid, type);
226
227 spin_lock(&config_list_lock);
228 list_add(&cld->cld_list_chain, &config_llog_list);
229 spin_unlock(&config_list_lock);
230
231 if (rc) {
232 config_log_put(cld);
0a3bdb00 233 return ERR_PTR(rc);
d7e09d03
PT
234 }
235
236 if (cld_is_sptlrpc(cld)) {
237 rc = mgc_process_log(obd, cld);
238 if (rc && rc != -ENOENT)
239 CERROR("failed processing sptlrpc log: %d\n", rc);
240 }
241
0a3bdb00 242 return cld;
d7e09d03
PT
243}
244
245static struct config_llog_data *config_recover_log_add(struct obd_device *obd,
246 char *fsname,
247 struct config_llog_instance *cfg,
248 struct super_block *sb)
249{
250 struct config_llog_instance lcfg = *cfg;
251 struct lustre_sb_info *lsi = s2lsi(sb);
252 struct config_llog_data *cld;
253 char logname[32];
254
255 if (IS_OST(lsi))
256 return NULL;
257
258 /* for osp-on-ost, see lustre_start_osp() */
259 if (IS_MDT(lsi) && lcfg.cfg_instance)
260 return NULL;
261
262 /* we have to use different llog for clients and mdts for cmd
263 * where only clients are notified if one of cmd server restarts */
264 LASSERT(strlen(fsname) < sizeof(logname) / 2);
265 strcpy(logname, fsname);
266 if (IS_SERVER(lsi)) { /* mdt */
267 LASSERT(lcfg.cfg_instance == NULL);
268 lcfg.cfg_instance = sb;
269 strcat(logname, "-mdtir");
270 } else {
271 LASSERT(lcfg.cfg_instance != NULL);
272 strcat(logname, "-cliir");
273 }
274
275 cld = do_config_log_add(obd, logname, CONFIG_T_RECOVER, &lcfg, sb);
276 return cld;
277}
278
7d4bae45
AB
279static struct config_llog_data *config_params_log_add(struct obd_device *obd,
280 struct config_llog_instance *cfg, struct super_block *sb)
281{
282 struct config_llog_instance lcfg = *cfg;
283 struct config_llog_data *cld;
284
285 lcfg.cfg_instance = sb;
286
287 cld = do_config_log_add(obd, PARAMS_FILENAME, CONFIG_T_PARAMS,
288 &lcfg, sb);
289
290 return cld;
291}
d7e09d03
PT
292
293/** Add this log to the list of active logs watched by an MGC.
294 * Active means we're watching for updates.
295 * We have one active log per "mount" - client instance or servername.
296 * Each instance may be at a different point in the log.
297 */
298static int config_log_add(struct obd_device *obd, char *logname,
299 struct config_llog_instance *cfg,
300 struct super_block *sb)
301{
302 struct lustre_sb_info *lsi = s2lsi(sb);
303 struct config_llog_data *cld;
304 struct config_llog_data *sptlrpc_cld;
7d4bae45
AB
305 struct config_llog_data *params_cld;
306 char seclogname[32];
307 char *ptr;
308 int rc;
d7e09d03
PT
309
310 CDEBUG(D_MGC, "adding config log %s:%p\n", logname, cfg->cfg_instance);
311
312 /*
313 * for each regular log, the depended sptlrpc log name is
314 * <fsname>-sptlrpc. multiple regular logs may share one sptlrpc log.
315 */
316 ptr = strrchr(logname, '-');
317 if (ptr == NULL || ptr - logname > 8) {
318 CERROR("logname %s is too long\n", logname);
0a3bdb00 319 return -EINVAL;
d7e09d03
PT
320 }
321
322 memcpy(seclogname, logname, ptr - logname);
323 strcpy(seclogname + (ptr - logname), "-sptlrpc");
324
325 sptlrpc_cld = config_log_find(seclogname, NULL);
326 if (sptlrpc_cld == NULL) {
327 sptlrpc_cld = do_config_log_add(obd, seclogname,
328 CONFIG_T_SPTLRPC, NULL, NULL);
329 if (IS_ERR(sptlrpc_cld)) {
330 CERROR("can't create sptlrpc log: %s\n", seclogname);
74d3ba98
JL
331 rc = PTR_ERR(sptlrpc_cld);
332 goto out_err;
d7e09d03
PT
333 }
334 }
7d4bae45
AB
335 params_cld = config_params_log_add(obd, cfg, sb);
336 if (IS_ERR(params_cld)) {
337 rc = PTR_ERR(params_cld);
338 CERROR("%s: can't create params log: rc = %d\n",
339 obd->obd_name, rc);
74d3ba98 340 goto out_err1;
7d4bae45 341 }
d7e09d03
PT
342
343 cld = do_config_log_add(obd, logname, CONFIG_T_CONFIG, cfg, sb);
344 if (IS_ERR(cld)) {
345 CERROR("can't create log: %s\n", logname);
74d3ba98
JL
346 rc = PTR_ERR(cld);
347 goto out_err2;
d7e09d03
PT
348 }
349
350 cld->cld_sptlrpc = sptlrpc_cld;
7d4bae45 351 cld->cld_params = params_cld;
d7e09d03
PT
352
353 LASSERT(lsi->lsi_lmd);
354 if (!(lsi->lsi_lmd->lmd_flags & LMD_FLG_NOIR)) {
355 struct config_llog_data *recover_cld;
356 *strrchr(seclogname, '-') = 0;
357 recover_cld = config_recover_log_add(obd, seclogname, cfg, sb);
74d3ba98
JL
358 if (IS_ERR(recover_cld)) {
359 rc = PTR_ERR(recover_cld);
360 goto out_err3;
361 }
d7e09d03
PT
362 cld->cld_recover = recover_cld;
363 }
364
0a3bdb00 365 return 0;
7d4bae45
AB
366
367out_err3:
368 config_log_put(cld);
369
370out_err2:
371 config_log_put(params_cld);
372
373out_err1:
374 config_log_put(sptlrpc_cld);
375
376out_err:
377 return rc;
d7e09d03
PT
378}
379
380DEFINE_MUTEX(llog_process_lock);
381
382/** Stop watching for updates on this log.
383 */
384static int config_log_end(char *logname, struct config_llog_instance *cfg)
385{
386 struct config_llog_data *cld;
387 struct config_llog_data *cld_sptlrpc = NULL;
7d4bae45 388 struct config_llog_data *cld_params = NULL;
d7e09d03
PT
389 struct config_llog_data *cld_recover = NULL;
390 int rc = 0;
d7e09d03
PT
391
392 cld = config_log_find(logname, cfg);
393 if (cld == NULL)
0a3bdb00 394 return -ENOENT;
d7e09d03
PT
395
396 mutex_lock(&cld->cld_lock);
397 /*
398 * if cld_stopping is set, it means we didn't start the log thus
399 * not owning the start ref. this can happen after previous umount:
400 * the cld still hanging there waiting for lock cancel, and we
401 * remount again but failed in the middle and call log_end without
402 * calling start_log.
403 */
404 if (unlikely(cld->cld_stopping)) {
405 mutex_unlock(&cld->cld_lock);
406 /* drop the ref from the find */
407 config_log_put(cld);
0a3bdb00 408 return rc;
d7e09d03
PT
409 }
410
411 cld->cld_stopping = 1;
412
413 cld_recover = cld->cld_recover;
414 cld->cld_recover = NULL;
415 mutex_unlock(&cld->cld_lock);
416
417 if (cld_recover) {
418 mutex_lock(&cld_recover->cld_lock);
419 cld_recover->cld_stopping = 1;
420 mutex_unlock(&cld_recover->cld_lock);
421 config_log_put(cld_recover);
422 }
423
424 spin_lock(&config_list_lock);
425 cld_sptlrpc = cld->cld_sptlrpc;
426 cld->cld_sptlrpc = NULL;
7d4bae45
AB
427 cld_params = cld->cld_params;
428 cld->cld_params = NULL;
d7e09d03
PT
429 spin_unlock(&config_list_lock);
430
431 if (cld_sptlrpc)
432 config_log_put(cld_sptlrpc);
433
7d4bae45
AB
434 if (cld_params) {
435 mutex_lock(&cld_params->cld_lock);
436 cld_params->cld_stopping = 1;
437 mutex_unlock(&cld_params->cld_lock);
438 config_log_put(cld_params);
439 }
440
d7e09d03
PT
441 /* drop the ref from the find */
442 config_log_put(cld);
443 /* drop the start ref */
444 config_log_put(cld);
445
446 CDEBUG(D_MGC, "end config log %s (%d)\n", logname ? logname : "client",
447 rc);
0a3bdb00 448 return rc;
d7e09d03
PT
449}
450
73bb1da6 451int lprocfs_mgc_rd_ir_state(struct seq_file *m, void *data)
d7e09d03
PT
452{
453 struct obd_device *obd = data;
1c8aa54a
HZ
454 struct obd_import *imp;
455 struct obd_connect_data *ocd;
d7e09d03 456 struct config_llog_data *cld;
d7e09d03 457
1c8aa54a
HZ
458 LPROCFS_CLIMP_CHECK(obd);
459 imp = obd->u.cli.cl_import;
460 ocd = &imp->imp_connect_data;
461
73bb1da6 462 seq_printf(m, "imperative_recovery: %s\n",
d7e09d03 463 OCD_HAS_FLAG(ocd, IMP_RECOV) ? "ENABLED" : "DISABLED");
73bb1da6 464 seq_printf(m, "client_state:\n");
d7e09d03
PT
465
466 spin_lock(&config_list_lock);
467 list_for_each_entry(cld, &config_llog_list, cld_list_chain) {
468 if (cld->cld_recover == NULL)
469 continue;
73bb1da6 470 seq_printf(m, " - { client: %s, nidtbl_version: %u }\n",
d7e09d03
PT
471 cld->cld_logname,
472 cld->cld_recover->cld_cfg.cfg_last_idx);
473 }
474 spin_unlock(&config_list_lock);
475
1c8aa54a 476 LPROCFS_CLIMP_EXIT(obd);
0a3bdb00 477 return 0;
d7e09d03
PT
478}
479
480/* reenqueue any lost locks */
481#define RQ_RUNNING 0x1
482#define RQ_NOW 0x2
483#define RQ_LATER 0x4
484#define RQ_STOP 0x8
4345abb2 485#define RQ_PRECLEANUP 0x10
225f597c 486static int rq_state;
d7e09d03
PT
487static wait_queue_head_t rq_waitq;
488static DECLARE_COMPLETION(rq_exit);
4345abb2 489static DECLARE_COMPLETION(rq_start);
d7e09d03
PT
490
491static void do_requeue(struct config_llog_data *cld)
492{
d7e09d03
PT
493 LASSERT(atomic_read(&cld->cld_refcount) > 0);
494
495 /* Do not run mgc_process_log on a disconnected export or an
496 export which is being disconnected. Take the client
497 semaphore to make the check non-racy. */
498 down_read(&cld->cld_mgcexp->exp_obd->u.cli.cl_sem);
499 if (cld->cld_mgcexp->exp_obd->u.cli.cl_conn_count != 0) {
500 CDEBUG(D_MGC, "updating log %s\n", cld->cld_logname);
501 mgc_process_log(cld->cld_mgcexp->exp_obd, cld);
502 } else {
503 CDEBUG(D_MGC, "disconnecting, won't update log %s\n",
504 cld->cld_logname);
505 }
506 up_read(&cld->cld_mgcexp->exp_obd->u.cli.cl_sem);
d7e09d03
PT
507}
508
509/* this timeout represents how many seconds MGC should wait before
510 * requeue config and recover lock to the MGS. We need to randomize this
511 * in order to not flood the MGS.
512 */
513#define MGC_TIMEOUT_MIN_SECONDS 5
514#define MGC_TIMEOUT_RAND_CENTISEC 0x1ff /* ~500 */
515
516static int mgc_requeue_thread(void *data)
517{
4345abb2
BJ
518 bool first = true;
519
d7e09d03
PT
520 CDEBUG(D_MGC, "Starting requeue thread\n");
521
522 /* Keep trying failed locks periodically */
523 spin_lock(&config_list_lock);
524 rq_state |= RQ_RUNNING;
525 while (1) {
526 struct l_wait_info lwi;
527 struct config_llog_data *cld, *cld_prev;
528 int rand = cfs_rand() & MGC_TIMEOUT_RAND_CENTISEC;
529 int stopped = !!(rq_state & RQ_STOP);
530 int to;
531
532 /* Any new or requeued lostlocks will change the state */
533 rq_state &= ~(RQ_NOW | RQ_LATER);
534 spin_unlock(&config_list_lock);
535
4345abb2
BJ
536 if (first) {
537 first = false;
538 complete(&rq_start);
539 }
540
d7e09d03
PT
541 /* Always wait a few seconds to allow the server who
542 caused the lock revocation to finish its setup, plus some
543 random so everyone doesn't try to reconnect at once. */
544 to = MGC_TIMEOUT_MIN_SECONDS * HZ;
545 to += rand * HZ / 100; /* rand is centi-seconds */
546 lwi = LWI_TIMEOUT(to, NULL, NULL);
4345abb2
BJ
547 l_wait_event(rq_waitq, rq_state & (RQ_STOP | RQ_PRECLEANUP),
548 &lwi);
d7e09d03
PT
549
550 /*
551 * iterate & processing through the list. for each cld, process
552 * its depending sptlrpc cld firstly (if any) and then itself.
553 *
554 * it's guaranteed any item in the list must have
555 * reference > 0; and if cld_lostlock is set, at
556 * least one reference is taken by the previous enqueue.
557 */
558 cld_prev = NULL;
559
560 spin_lock(&config_list_lock);
4345abb2 561 rq_state &= ~RQ_PRECLEANUP;
d7e09d03
PT
562 list_for_each_entry(cld, &config_llog_list,
563 cld_list_chain) {
564 if (!cld->cld_lostlock)
565 continue;
566
567 spin_unlock(&config_list_lock);
568
569 LASSERT(atomic_read(&cld->cld_refcount) > 0);
570
571 /* Whether we enqueued again or not in mgc_process_log,
572 * we're done with the ref from the old enqueue */
573 if (cld_prev)
574 config_log_put(cld_prev);
575 cld_prev = cld;
576
577 cld->cld_lostlock = 0;
578 if (likely(!stopped))
579 do_requeue(cld);
580
581 spin_lock(&config_list_lock);
582 }
583 spin_unlock(&config_list_lock);
584 if (cld_prev)
585 config_log_put(cld_prev);
586
587 /* break after scanning the list so that we can drop
588 * refcount to losing lock clds */
589 if (unlikely(stopped)) {
590 spin_lock(&config_list_lock);
591 break;
592 }
593
594 /* Wait a bit to see if anyone else needs a requeue */
595 lwi = (struct l_wait_info) { 0 };
596 l_wait_event(rq_waitq, rq_state & (RQ_NOW | RQ_STOP),
597 &lwi);
598 spin_lock(&config_list_lock);
599 }
600 /* spinlock and while guarantee RQ_NOW and RQ_LATER are not set */
601 rq_state &= ~RQ_RUNNING;
602 spin_unlock(&config_list_lock);
603
604 complete(&rq_exit);
605
606 CDEBUG(D_MGC, "Ending requeue thread\n");
84827278 607 return 0;
d7e09d03
PT
608}
609
610/* Add a cld to the list to requeue. Start the requeue thread if needed.
611 We are responsible for dropping the config log reference from here on out. */
612static void mgc_requeue_add(struct config_llog_data *cld)
613{
d7e09d03
PT
614 CDEBUG(D_INFO, "log %s: requeue (r=%d sp=%d st=%x)\n",
615 cld->cld_logname, atomic_read(&cld->cld_refcount),
616 cld->cld_stopping, rq_state);
617 LASSERT(atomic_read(&cld->cld_refcount) > 0);
618
619 mutex_lock(&cld->cld_lock);
620 if (cld->cld_stopping || cld->cld_lostlock) {
621 mutex_unlock(&cld->cld_lock);
e05e02e4 622 return;
d7e09d03
PT
623 }
624 /* this refcount will be released in mgc_requeue_thread. */
625 config_log_get(cld);
626 cld->cld_lostlock = 1;
627 mutex_unlock(&cld->cld_lock);
628
629 /* Hold lock for rq_state */
630 spin_lock(&config_list_lock);
631 if (rq_state & RQ_STOP) {
632 spin_unlock(&config_list_lock);
633 cld->cld_lostlock = 0;
634 config_log_put(cld);
635 } else {
636 rq_state |= RQ_NOW;
637 spin_unlock(&config_list_lock);
638 wake_up(&rq_waitq);
639 }
d7e09d03
PT
640}
641
aa4e3c8a
MP
642static int mgc_llog_init(const struct lu_env *env, struct obd_device *obd)
643{
644 struct llog_ctxt *ctxt;
645 int rc;
646
647 /* setup only remote ctxt, the local disk context is switched per each
648 * filesystem during mgc_fs_setup() */
649 rc = llog_setup(env, obd, &obd->obd_olg, LLOG_CONFIG_REPL_CTXT, obd,
650 &llog_client_ops);
651 if (rc)
652 return rc;
653
654 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
655 LASSERT(ctxt);
656
657 llog_initiator_connect(ctxt);
658 llog_ctxt_put(ctxt);
659
660 return 0;
661}
662
663static int mgc_llog_fini(const struct lu_env *env, struct obd_device *obd)
664{
665 struct llog_ctxt *ctxt;
666
667 ctxt = llog_get_context(obd, LLOG_CONFIG_REPL_CTXT);
668 if (ctxt)
669 llog_cleanup(env, ctxt);
670
671 return 0;
d7e09d03
PT
672}
673
674static atomic_t mgc_count = ATOMIC_INIT(0);
675static int mgc_precleanup(struct obd_device *obd, enum obd_cleanup_stage stage)
676{
677 int rc = 0;
4345abb2 678 int temp;
d7e09d03
PT
679
680 switch (stage) {
681 case OBD_CLEANUP_EARLY:
682 break;
683 case OBD_CLEANUP_EXPORTS:
684 if (atomic_dec_and_test(&mgc_count)) {
4345abb2 685 LASSERT(rq_state & RQ_RUNNING);
d7e09d03 686 /* stop requeue thread */
4345abb2
BJ
687 temp = RQ_STOP;
688 } else {
689 /* wakeup requeue thread to clean our cld */
690 temp = RQ_NOW | RQ_PRECLEANUP;
d7e09d03 691 }
4345abb2
BJ
692 spin_lock(&config_list_lock);
693 rq_state |= temp;
694 spin_unlock(&config_list_lock);
695 wake_up(&rq_waitq);
696 if (temp & RQ_STOP)
697 wait_for_completion(&rq_exit);
d7e09d03 698 obd_cleanup_client_import(obd);
aa4e3c8a 699 rc = mgc_llog_fini(NULL, obd);
d7e09d03
PT
700 if (rc != 0)
701 CERROR("failed to cleanup llogging subsystems\n");
702 break;
703 }
0a3bdb00 704 return rc;
d7e09d03
PT
705}
706
707static int mgc_cleanup(struct obd_device *obd)
708{
d7e09d03
PT
709 /* COMPAT_146 - old config logs may have added profiles we don't
710 know about */
711 if (obd->obd_type->typ_refcnt <= 1)
712 /* Only for the last mgc */
713 class_del_profiles();
714
715 lprocfs_obd_cleanup(obd);
716 ptlrpcd_decref();
717
aba5c139 718 return client_obd_cleanup(obd);
d7e09d03
PT
719}
720
721static int mgc_setup(struct obd_device *obd, struct lustre_cfg *lcfg)
722{
9b801302 723 struct lprocfs_static_vars lvars = { NULL };
d7e09d03 724 int rc;
d7e09d03
PT
725
726 ptlrpcd_addref();
727
728 rc = client_obd_setup(obd, lcfg);
729 if (rc)
74d3ba98 730 goto err_decref;
d7e09d03 731
aa4e3c8a 732 rc = mgc_llog_init(NULL, obd);
d7e09d03
PT
733 if (rc) {
734 CERROR("failed to setup llogging subsystems\n");
74d3ba98 735 goto err_cleanup;
d7e09d03
PT
736 }
737
738 lprocfs_mgc_init_vars(&lvars);
9b801302 739 lprocfs_obd_setup(obd, lvars.obd_vars, lvars.sysfs_vars);
d7e09d03
PT
740 sptlrpc_lprocfs_cliobd_attach(obd);
741
742 if (atomic_inc_return(&mgc_count) == 1) {
743 rq_state = 0;
744 init_waitqueue_head(&rq_waitq);
745
746 /* start requeue thread */
747 rc = PTR_ERR(kthread_run(mgc_requeue_thread, NULL,
748 "ll_cfg_requeue"));
749 if (IS_ERR_VALUE(rc)) {
2d00bd17 750 CERROR("%s: Cannot start requeue thread (%d),no more log updates!\n",
d7e09d03 751 obd->obd_name, rc);
74d3ba98 752 goto err_cleanup;
d7e09d03
PT
753 }
754 /* rc is the task_struct pointer of mgc_requeue_thread. */
755 rc = 0;
4345abb2 756 wait_for_completion(&rq_start);
d7e09d03
PT
757 }
758
0a3bdb00 759 return rc;
d7e09d03
PT
760
761err_cleanup:
762 client_obd_cleanup(obd);
763err_decref:
764 ptlrpcd_decref();
0a3bdb00 765 return rc;
d7e09d03
PT
766}
767
768/* based on ll_mdc_blocking_ast */
769static int mgc_blocking_ast(struct ldlm_lock *lock, struct ldlm_lock_desc *desc,
770 void *data, int flag)
771{
772 struct lustre_handle lockh;
773 struct config_llog_data *cld = (struct config_llog_data *)data;
774 int rc = 0;
d7e09d03
PT
775
776 switch (flag) {
777 case LDLM_CB_BLOCKING:
778 /* mgs wants the lock, give it up... */
779 LDLM_DEBUG(lock, "MGC blocking CB");
780 ldlm_lock2handle(lock, &lockh);
781 rc = ldlm_cli_cancel(&lockh, LCF_ASYNC);
782 break;
783 case LDLM_CB_CANCELING:
784 /* We've given up the lock, prepare ourselves to update. */
785 LDLM_DEBUG(lock, "MGC cancel CB");
786
6d95e048
AD
787 CDEBUG(D_MGC, "Lock res "DLDLMRES" (%.8s)\n",
788 PLDLMRES(lock->l_resource),
d7e09d03
PT
789 (char *)&lock->l_resource->lr_name.name[0]);
790
791 if (!cld) {
792 CDEBUG(D_INFO, "missing data, won't requeue\n");
793 break;
794 }
795
796 /* held at mgc_process_log(). */
797 LASSERT(atomic_read(&cld->cld_refcount) > 0);
798 /* Are we done with this log? */
799 if (cld->cld_stopping) {
800 CDEBUG(D_MGC, "log %s: stopping, won't requeue\n",
801 cld->cld_logname);
802 config_log_put(cld);
803 break;
804 }
805 /* Make sure not to re-enqueue when the mgc is stopping
806 (we get called from client_disconnect_export) */
807 if (!lock->l_conn_export ||
808 !lock->l_conn_export->exp_obd->u.cli.cl_conn_count) {
809 CDEBUG(D_MGC, "log %.8s: disconnecting, won't requeue\n",
810 cld->cld_logname);
811 config_log_put(cld);
812 break;
813 }
814
815 /* Re-enqueue now */
816 mgc_requeue_add(cld);
817 config_log_put(cld);
818 break;
819 default:
820 LBUG();
821 }
822
0a3bdb00 823 return rc;
d7e09d03
PT
824}
825
826/* Not sure where this should go... */
06e4f6ca
CS
827/* This is the timeout value for MGS_CONNECT request plus a ping interval, such
828 * that we can have a chance to try the secondary MGS if any. */
829#define MGC_ENQUEUE_LIMIT (INITIAL_CONNECT_TIMEOUT + (AT_OFF ? 0 : at_min) \
830 + PING_INTERVAL)
d7e09d03
PT
831#define MGC_TARGET_REG_LIMIT 10
832#define MGC_SEND_PARAM_LIMIT 10
833
834/* Send parameter to MGS*/
835static int mgc_set_mgs_param(struct obd_export *exp,
836 struct mgs_send_param *msp)
837{
838 struct ptlrpc_request *req;
839 struct mgs_send_param *req_msp, *rep_msp;
840 int rc;
d7e09d03
PT
841
842 req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
843 &RQF_MGS_SET_INFO, LUSTRE_MGS_VERSION,
844 MGS_SET_INFO);
845 if (!req)
0a3bdb00 846 return -ENOMEM;
d7e09d03
PT
847
848 req_msp = req_capsule_client_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
849 if (!req_msp) {
850 ptlrpc_req_finished(req);
0a3bdb00 851 return -ENOMEM;
d7e09d03
PT
852 }
853
854 memcpy(req_msp, msp, sizeof(*req_msp));
855 ptlrpc_request_set_replen(req);
856
857 /* Limit how long we will wait for the enqueue to complete */
858 req->rq_delay_limit = MGC_SEND_PARAM_LIMIT;
859 rc = ptlrpc_queue_wait(req);
860 if (!rc) {
861 rep_msp = req_capsule_server_get(&req->rq_pill, &RMF_MGS_SEND_PARAM);
862 memcpy(msp, rep_msp, sizeof(*rep_msp));
863 }
864
865 ptlrpc_req_finished(req);
866
0a3bdb00 867 return rc;
d7e09d03
PT
868}
869
870/* Take a config lock so we can get cancel notifications */
871static int mgc_enqueue(struct obd_export *exp, struct lov_stripe_md *lsm,
872 __u32 type, ldlm_policy_data_t *policy, __u32 mode,
873 __u64 *flags, void *bl_cb, void *cp_cb, void *gl_cb,
874 void *data, __u32 lvb_len, void *lvb_swabber,
875 struct lustre_handle *lockh)
876{
877 struct config_llog_data *cld = (struct config_llog_data *)data;
f2145eae
BK
878 struct ldlm_enqueue_info einfo = {
879 .ei_type = type,
880 .ei_mode = mode,
881 .ei_cb_bl = mgc_blocking_ast,
882 .ei_cb_cp = ldlm_completion_ast,
883 };
d7e09d03
PT
884 struct ptlrpc_request *req;
885 int short_limit = cld_is_sptlrpc(cld);
886 int rc;
d7e09d03 887
55f5a824 888 CDEBUG(D_MGC, "Enqueue for %s (res %#llx)\n", cld->cld_logname,
d7e09d03
PT
889 cld->cld_resid.name[0]);
890
891 /* We need a callback for every lockholder, so don't try to
892 ldlm_lock_match (see rev 1.1.2.11.2.47) */
893 req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
894 &RQF_LDLM_ENQUEUE, LUSTRE_DLM_VERSION,
895 LDLM_ENQUEUE);
896 if (req == NULL)
0a3bdb00 897 return -ENOMEM;
d7e09d03
PT
898
899 req_capsule_set_size(&req->rq_pill, &RMF_DLM_LVB, RCL_SERVER, 0);
900 ptlrpc_request_set_replen(req);
901
902 /* check if this is server or client */
903 if (cld->cld_cfg.cfg_sb) {
904 struct lustre_sb_info *lsi = s2lsi(cld->cld_cfg.cfg_sb);
905 if (lsi && IS_SERVER(lsi))
906 short_limit = 1;
907 }
908 /* Limit how long we will wait for the enqueue to complete */
909 req->rq_delay_limit = short_limit ? 5 : MGC_ENQUEUE_LIMIT;
910 rc = ldlm_cli_enqueue(exp, &req, &einfo, &cld->cld_resid, NULL, flags,
911 NULL, 0, LVB_T_NONE, lockh, 0);
912 /* A failed enqueue should still call the mgc_blocking_ast,
913 where it will be requeued if needed ("grant failed"). */
914 ptlrpc_req_finished(req);
0a3bdb00 915 return rc;
d7e09d03
PT
916}
917
d7e09d03
PT
918static void mgc_notify_active(struct obd_device *unused)
919{
920 /* wakeup mgc_requeue_thread to requeue mgc lock */
921 spin_lock(&config_list_lock);
922 rq_state |= RQ_NOW;
923 spin_unlock(&config_list_lock);
924 wake_up(&rq_waitq);
925
926 /* TODO: Help the MGS rebuild nidtbl. -jay */
927}
928
929/* Send target_reg message to MGS */
930static int mgc_target_register(struct obd_export *exp,
931 struct mgs_target_info *mti)
932{
933 struct ptlrpc_request *req;
934 struct mgs_target_info *req_mti, *rep_mti;
935 int rc;
d7e09d03
PT
936
937 req = ptlrpc_request_alloc_pack(class_exp2cliimp(exp),
938 &RQF_MGS_TARGET_REG, LUSTRE_MGS_VERSION,
939 MGS_TARGET_REG);
940 if (req == NULL)
0a3bdb00 941 return -ENOMEM;
d7e09d03
PT
942
943 req_mti = req_capsule_client_get(&req->rq_pill, &RMF_MGS_TARGET_INFO);
944 if (!req_mti) {
945 ptlrpc_req_finished(req);
0a3bdb00 946 return -ENOMEM;
d7e09d03
PT
947 }
948
949 memcpy(req_mti, mti, sizeof(*req_mti));
950 ptlrpc_request_set_replen(req);
951 CDEBUG(D_MGC, "register %s\n", mti->mti_svname);
952 /* Limit how long we will wait for the enqueue to complete */
953 req->rq_delay_limit = MGC_TARGET_REG_LIMIT;
954
955 rc = ptlrpc_queue_wait(req);
956 if (!rc) {
957 rep_mti = req_capsule_server_get(&req->rq_pill,
958 &RMF_MGS_TARGET_INFO);
959 memcpy(mti, rep_mti, sizeof(*rep_mti));
960 CDEBUG(D_MGC, "register %s got index = %d\n",
961 mti->mti_svname, mti->mti_stripe_index);
962 }
963 ptlrpc_req_finished(req);
964
0a3bdb00 965 return rc;
d7e09d03
PT
966}
967
0bf31f07 968static int mgc_set_info_async(const struct lu_env *env, struct obd_export *exp,
21aef7d9 969 u32 keylen, void *key, u32 vallen,
d7e09d03
PT
970 void *val, struct ptlrpc_request_set *set)
971{
972 int rc = -EINVAL;
d7e09d03
PT
973
974 /* Turn off initial_recov after we try all backup servers once */
975 if (KEY_IS(KEY_INIT_RECOV_BACKUP)) {
976 struct obd_import *imp = class_exp2cliimp(exp);
977 int value;
978 if (vallen != sizeof(int))
0a3bdb00 979 return -EINVAL;
d7e09d03
PT
980 value = *(int *)val;
981 CDEBUG(D_MGC, "InitRecov %s %d/d%d:i%d:r%d:or%d:%s\n",
982 imp->imp_obd->obd_name, value,
983 imp->imp_deactive, imp->imp_invalid,
984 imp->imp_replayable, imp->imp_obd->obd_replayable,
985 ptlrpc_import_state_name(imp->imp_state));
986 /* Resurrect if we previously died */
987 if ((imp->imp_state != LUSTRE_IMP_FULL &&
988 imp->imp_state != LUSTRE_IMP_NEW) || value > 1)
989 ptlrpc_reconnect_import(imp);
0a3bdb00 990 return 0;
d7e09d03 991 }
d7e09d03
PT
992 if (KEY_IS(KEY_SET_INFO)) {
993 struct mgs_send_param *msp;
994
995 msp = (struct mgs_send_param *)val;
996 rc = mgc_set_mgs_param(exp, msp);
0a3bdb00 997 return rc;
d7e09d03
PT
998 }
999 if (KEY_IS(KEY_MGSSEC)) {
1000 struct client_obd *cli = &exp->exp_obd->u.cli;
1001 struct sptlrpc_flavor flvr;
1002
1003 /*
1004 * empty string means using current flavor, if which haven't
1005 * been set yet, set it as null.
1006 *
1007 * if flavor has been set previously, check the asking flavor
1008 * must match the existing one.
1009 */
1010 if (vallen == 0) {
1011 if (cli->cl_flvr_mgc.sf_rpc != SPTLRPC_FLVR_INVALID)
0a3bdb00 1012 return 0;
d7e09d03
PT
1013 val = "null";
1014 vallen = 4;
1015 }
1016
1017 rc = sptlrpc_parse_flavor(val, &flvr);
1018 if (rc) {
1019 CERROR("invalid sptlrpc flavor %s to MGS\n",
1020 (char *) val);
0a3bdb00 1021 return rc;
d7e09d03
PT
1022 }
1023
1024 /*
1025 * caller already hold a mutex
1026 */
1027 if (cli->cl_flvr_mgc.sf_rpc == SPTLRPC_FLVR_INVALID) {
1028 cli->cl_flvr_mgc = flvr;
1029 } else if (memcmp(&cli->cl_flvr_mgc, &flvr,
1030 sizeof(flvr)) != 0) {
1031 char str[20];
1032
1033 sptlrpc_flavor2name(&cli->cl_flvr_mgc,
1034 str, sizeof(str));
2d00bd17 1035 LCONSOLE_ERROR("asking sptlrpc flavor %s to MGS but currently %s is in use\n",
d7e09d03
PT
1036 (char *) val, str);
1037 rc = -EPERM;
1038 }
0a3bdb00 1039 return rc;
d7e09d03
PT
1040 }
1041
0a3bdb00 1042 return rc;
d7e09d03
PT
1043}
1044
1045static int mgc_get_info(const struct lu_env *env, struct obd_export *exp,
1046 __u32 keylen, void *key, __u32 *vallen, void *val,
1047 struct lov_stripe_md *unused)
1048{
1049 int rc = -EINVAL;
1050
1051 if (KEY_IS(KEY_CONN_DATA)) {
1052 struct obd_import *imp = class_exp2cliimp(exp);
1053 struct obd_connect_data *data = val;
1054
1055 if (*vallen == sizeof(*data)) {
1056 *data = imp->imp_connect_data;
1057 rc = 0;
1058 }
1059 }
1060
1061 return rc;
1062}
1063
1064static int mgc_import_event(struct obd_device *obd,
1065 struct obd_import *imp,
1066 enum obd_import_event event)
1067{
d7e09d03
PT
1068 LASSERT(imp->imp_obd == obd);
1069 CDEBUG(D_MGC, "import event %#x\n", event);
1070
1071 switch (event) {
1072 case IMP_EVENT_DISCON:
1073 /* MGC imports should not wait for recovery */
1074 if (OCD_HAS_FLAG(&imp->imp_connect_data, IMP_RECOV))
1075 ptlrpc_pinger_ir_down();
1076 break;
1077 case IMP_EVENT_INACTIVE:
1078 break;
1079 case IMP_EVENT_INVALIDATE: {
1080 struct ldlm_namespace *ns = obd->obd_namespace;
1081 ldlm_namespace_cleanup(ns, LDLM_FL_LOCAL_ONLY);
1082 break;
1083 }
1084 case IMP_EVENT_ACTIVE:
1085 CDEBUG(D_INFO, "%s: Reactivating import\n", obd->obd_name);
1086 /* Clearing obd_no_recov allows us to continue pinging */
1087 obd->obd_no_recov = 0;
1088 mgc_notify_active(obd);
1089 if (OCD_HAS_FLAG(&imp->imp_connect_data, IMP_RECOV))
1090 ptlrpc_pinger_ir_up();
1091 break;
1092 case IMP_EVENT_OCD:
1093 break;
1094 case IMP_EVENT_DEACTIVATE:
1095 case IMP_EVENT_ACTIVATE:
1096 break;
1097 default:
1098 CERROR("Unknown import event %#x\n", event);
1099 LBUG();
1100 }
84827278 1101 return 0;
d7e09d03
PT
1102}
1103
d7e09d03
PT
1104enum {
1105 CONFIG_READ_NRPAGES_INIT = 1 << (20 - PAGE_CACHE_SHIFT),
1106 CONFIG_READ_NRPAGES = 4
1107};
1108
1109static int mgc_apply_recover_logs(struct obd_device *mgc,
1110 struct config_llog_data *cld,
1111 __u64 max_version,
1112 void *data, int datalen, bool mne_swab)
1113{
1114 struct config_llog_instance *cfg = &cld->cld_cfg;
1115 struct lustre_sb_info *lsi = s2lsi(cfg->cfg_sb);
1116 struct mgs_nidtbl_entry *entry;
1117 struct lustre_cfg *lcfg;
1118 struct lustre_cfg_bufs bufs;
1119 u64 prev_version = 0;
1120 char *inst;
1121 char *buf;
1122 int bufsz;
1123 int pos;
1124 int rc = 0;
1125 int off = 0;
d7e09d03
PT
1126
1127 LASSERT(cfg->cfg_instance != NULL);
1128 LASSERT(cfg->cfg_sb == cfg->cfg_instance);
1129
c9b4297f 1130 inst = kzalloc(PAGE_CACHE_SIZE, GFP_NOFS);
c829be81 1131 if (!inst)
0a3bdb00 1132 return -ENOMEM;
d7e09d03
PT
1133
1134 if (!IS_SERVER(lsi)) {
1135 pos = snprintf(inst, PAGE_CACHE_SIZE, "%p", cfg->cfg_instance);
1136 if (pos >= PAGE_CACHE_SIZE) {
c9b4297f 1137 kfree(inst);
d7e09d03
PT
1138 return -E2BIG;
1139 }
1140 } else {
1141 LASSERT(IS_MDT(lsi));
1142 rc = server_name2svname(lsi->lsi_svname, inst, NULL,
1143 PAGE_CACHE_SIZE);
1144 if (rc) {
c9b4297f 1145 kfree(inst);
0a3bdb00 1146 return -EINVAL;
d7e09d03
PT
1147 }
1148 pos = strlen(inst);
1149 }
1150
1151 ++pos;
1152 buf = inst + pos;
1153 bufsz = PAGE_CACHE_SIZE - pos;
1154
1155 while (datalen > 0) {
1156 int entry_len = sizeof(*entry);
1157 int is_ost;
1158 struct obd_device *obd;
1159 char *obdname;
1160 char *cname;
1161 char *params;
1162 char *uuid;
1163
1164 rc = -EINVAL;
1165 if (datalen < sizeof(*entry))
1166 break;
1167
1168 entry = (typeof(entry))(data + off);
1169
1170 /* sanity check */
1171 if (entry->mne_nid_type != 0) /* only support type 0 for ipv4 */
1172 break;
1173 if (entry->mne_nid_count == 0) /* at least one nid entry */
1174 break;
1175 if (entry->mne_nid_size != sizeof(lnet_nid_t))
1176 break;
1177
1178 entry_len += entry->mne_nid_count * entry->mne_nid_size;
1179 if (datalen < entry_len) /* must have entry_len at least */
1180 break;
1181
1182 /* Keep this swab for normal mixed endian handling. LU-1644 */
1183 if (mne_swab)
1184 lustre_swab_mgs_nidtbl_entry(entry);
1185 if (entry->mne_length > PAGE_CACHE_SIZE) {
1186 CERROR("MNE too large (%u)\n", entry->mne_length);
1187 break;
1188 }
1189
1190 if (entry->mne_length < entry_len)
1191 break;
1192
1193 off += entry->mne_length;
1194 datalen -= entry->mne_length;
1195 if (datalen < 0)
1196 break;
1197
1198 if (entry->mne_version > max_version) {
1199 CERROR("entry index(%lld) is over max_index(%lld)\n",
1200 entry->mne_version, max_version);
1201 break;
1202 }
1203
1204 if (prev_version >= entry->mne_version) {
1205 CERROR("index unsorted, prev %lld, now %lld\n",
1206 prev_version, entry->mne_version);
1207 break;
1208 }
1209 prev_version = entry->mne_version;
1210
1211 /*
1212 * Write a string with format "nid::instance" to
1213 * lustre/<osc|mdc>/<target>-<osc|mdc>-<instance>/import.
1214 */
1215
1216 is_ost = entry->mne_type == LDD_F_SV_TYPE_OST;
1217 memset(buf, 0, bufsz);
1218 obdname = buf;
1219 pos = 0;
1220
1221 /* lustre-OST0001-osc-<instance #> */
1222 strcpy(obdname, cld->cld_logname);
1223 cname = strrchr(obdname, '-');
1224 if (cname == NULL) {
1225 CERROR("mgc %s: invalid logname %s\n",
1226 mgc->obd_name, obdname);
1227 break;
1228 }
1229
1230 pos = cname - obdname;
1231 obdname[pos] = 0;
1232 pos += sprintf(obdname + pos, "-%s%04x",
1233 is_ost ? "OST" : "MDT", entry->mne_index);
1234
91f2208c 1235 cname = is_ost ? "osc" : "mdc";
d7e09d03
PT
1236 pos += sprintf(obdname + pos, "-%s-%s", cname, inst);
1237 lustre_cfg_bufs_reset(&bufs, obdname);
1238
1239 /* find the obd by obdname */
1240 obd = class_name2obd(obdname);
1241 if (obd == NULL) {
1242 CDEBUG(D_INFO, "mgc %s: cannot find obdname %s\n",
1243 mgc->obd_name, obdname);
1244 rc = 0;
1245 /* this is a safe race, when the ost is starting up...*/
1246 continue;
1247 }
1248
1249 /* osc.import = "connection=<Conn UUID>::<target instance>" */
1250 ++pos;
1251 params = buf + pos;
1252 pos += sprintf(params, "%s.import=%s", cname, "connection=");
1253 uuid = buf + pos;
1254
1255 down_read(&obd->u.cli.cl_sem);
1256 if (obd->u.cli.cl_import == NULL) {
1257 /* client does not connect to the OST yet */
1258 up_read(&obd->u.cli.cl_sem);
1259 rc = 0;
1260 continue;
1261 }
1262
1263 /* TODO: iterate all nids to find one */
1264 /* find uuid by nid */
1265 rc = client_import_find_conn(obd->u.cli.cl_import,
1266 entry->u.nids[0],
1267 (struct obd_uuid *)uuid);
1268 up_read(&obd->u.cli.cl_sem);
1269 if (rc < 0) {
1270 CERROR("mgc: cannot find uuid by nid %s\n",
1271 libcfs_nid2str(entry->u.nids[0]));
1272 break;
1273 }
1274
1275 CDEBUG(D_INFO, "Find uuid %s by nid %s\n",
1276 uuid, libcfs_nid2str(entry->u.nids[0]));
1277
1278 pos += strlen(uuid);
1279 pos += sprintf(buf + pos, "::%u", entry->mne_instance);
1280 LASSERT(pos < bufsz);
1281
1282 lustre_cfg_bufs_set_string(&bufs, 1, params);
1283
1284 rc = -ENOMEM;
1285 lcfg = lustre_cfg_new(LCFG_PARAM, &bufs);
1286 if (lcfg == NULL) {
1287 CERROR("mgc: cannot allocate memory\n");
1288 break;
1289 }
1290
f537dd2c 1291 CDEBUG(D_INFO, "ir apply logs %lld/%lld for %s -> %s\n",
d7e09d03
PT
1292 prev_version, max_version, obdname, params);
1293
1294 rc = class_process_config(lcfg);
1295 lustre_cfg_free(lcfg);
1296 if (rc)
1297 CDEBUG(D_INFO, "process config for %s error %d\n",
1298 obdname, rc);
1299
1300 /* continue, even one with error */
1301 }
1302
c9b4297f 1303 kfree(inst);
0a3bdb00 1304 return rc;
d7e09d03
PT
1305}
1306
1307/**
1308 * This function is called if this client was notified for target restarting
1309 * by the MGS. A CONFIG_READ RPC is going to send to fetch recovery logs.
1310 */
1311static int mgc_process_recover_log(struct obd_device *obd,
1312 struct config_llog_data *cld)
1313{
1314 struct ptlrpc_request *req = NULL;
1315 struct config_llog_instance *cfg = &cld->cld_cfg;
1316 struct mgs_config_body *body;
1317 struct mgs_config_res *res;
1318 struct ptlrpc_bulk_desc *desc;
1319 struct page **pages;
1320 int nrpages;
1321 bool eof = true;
1322 bool mne_swab = false;
1323 int i;
1324 int ealen;
1325 int rc;
d7e09d03
PT
1326
1327 /* allocate buffer for bulk transfer.
1328 * if this is the first time for this mgs to read logs,
1329 * CONFIG_READ_NRPAGES_INIT will be used since it will read all logs
1330 * once; otherwise, it only reads increment of logs, this should be
1331 * small and CONFIG_READ_NRPAGES will be used.
1332 */
1333 nrpages = CONFIG_READ_NRPAGES;
1334 if (cfg->cfg_last_idx == 0) /* the first time */
1335 nrpages = CONFIG_READ_NRPAGES_INIT;
1336
c9b4297f 1337 pages = kcalloc(nrpages, sizeof(*pages), GFP_NOFS);
74d3ba98
JL
1338 if (pages == NULL) {
1339 rc = -ENOMEM;
1340 goto out;
1341 }
d7e09d03
PT
1342
1343 for (i = 0; i < nrpages; i++) {
1344 pages[i] = alloc_page(GFP_IOFS);
74d3ba98
JL
1345 if (pages[i] == NULL) {
1346 rc = -ENOMEM;
1347 goto out;
1348 }
d7e09d03
PT
1349 }
1350
1351again:
1352 LASSERT(cld_is_recover(cld));
1353 LASSERT(mutex_is_locked(&cld->cld_lock));
1354 req = ptlrpc_request_alloc(class_exp2cliimp(cld->cld_mgcexp),
1355 &RQF_MGS_CONFIG_READ);
74d3ba98
JL
1356 if (req == NULL) {
1357 rc = -ENOMEM;
1358 goto out;
1359 }
d7e09d03
PT
1360
1361 rc = ptlrpc_request_pack(req, LUSTRE_MGS_VERSION, MGS_CONFIG_READ);
1362 if (rc)
74d3ba98 1363 goto out;
d7e09d03
PT
1364
1365 /* pack request */
1366 body = req_capsule_client_get(&req->rq_pill, &RMF_MGS_CONFIG_BODY);
1367 LASSERT(body != NULL);
1368 LASSERT(sizeof(body->mcb_name) > strlen(cld->cld_logname));
1369 if (strlcpy(body->mcb_name, cld->cld_logname, sizeof(body->mcb_name))
74d3ba98
JL
1370 >= sizeof(body->mcb_name)) {
1371 rc = -E2BIG;
1372 goto out;
1373 }
d7e09d03
PT
1374 body->mcb_offset = cfg->cfg_last_idx + 1;
1375 body->mcb_type = cld->cld_type;
1376 body->mcb_bits = PAGE_CACHE_SHIFT;
1377 body->mcb_units = nrpages;
1378
1379 /* allocate bulk transfer descriptor */
1380 desc = ptlrpc_prep_bulk_imp(req, nrpages, 1, BULK_PUT_SINK,
1381 MGS_BULK_PORTAL);
74d3ba98
JL
1382 if (desc == NULL) {
1383 rc = -ENOMEM;
1384 goto out;
1385 }
d7e09d03
PT
1386
1387 for (i = 0; i < nrpages; i++)
1388 ptlrpc_prep_bulk_page_pin(desc, pages[i], 0, PAGE_CACHE_SIZE);
1389
1390 ptlrpc_request_set_replen(req);
1391 rc = ptlrpc_queue_wait(req);
1392 if (rc)
74d3ba98 1393 goto out;
d7e09d03
PT
1394
1395 res = req_capsule_server_get(&req->rq_pill, &RMF_MGS_CONFIG_RES);
74d3ba98
JL
1396 if (res->mcr_size < res->mcr_offset) {
1397 rc = -EINVAL;
1398 goto out;
1399 }
d7e09d03
PT
1400
1401 /* always update the index even though it might have errors with
1402 * handling the recover logs */
1403 cfg->cfg_last_idx = res->mcr_offset;
1404 eof = res->mcr_offset == res->mcr_size;
1405
f537dd2c 1406 CDEBUG(D_INFO, "Latest version %lld, more %d.\n",
d7e09d03
PT
1407 res->mcr_offset, eof == false);
1408
1409 ealen = sptlrpc_cli_unwrap_bulk_read(req, req->rq_bulk, 0);
74d3ba98
JL
1410 if (ealen < 0) {
1411 rc = ealen;
1412 goto out;
1413 }
d7e09d03 1414
74d3ba98
JL
1415 if (ealen > nrpages << PAGE_CACHE_SHIFT) {
1416 rc = -EINVAL;
1417 goto out;
1418 }
d7e09d03
PT
1419
1420 if (ealen == 0) { /* no logs transferred */
1421 if (!eof)
1422 rc = -EINVAL;
74d3ba98 1423 goto out;
d7e09d03
PT
1424 }
1425
1426 mne_swab = !!ptlrpc_rep_need_swab(req);
1427#if LUSTRE_VERSION_CODE < OBD_OCD_VERSION(3, 2, 50, 0)
1428 /* This import flag means the server did an extra swab of IR MNE
1429 * records (fixed in LU-1252), reverse it here if needed. LU-1644 */
1430 if (unlikely(req->rq_import->imp_need_mne_swab))
1431 mne_swab = !mne_swab;
1432#else
1433#warning "LU-1644: Remove old OBD_CONNECT_MNE_SWAB fixup and imp_need_mne_swab"
1434#endif
1435
1436 for (i = 0; i < nrpages && ealen > 0; i++) {
1437 int rc2;
1438 void *ptr;
1439
1440 ptr = kmap(pages[i]);
1441 rc2 = mgc_apply_recover_logs(obd, cld, res->mcr_offset, ptr,
1442 min_t(int, ealen, PAGE_CACHE_SIZE),
1443 mne_swab);
1444 kunmap(pages[i]);
1445 if (rc2 < 0) {
1446 CWARN("Process recover log %s error %d\n",
1447 cld->cld_logname, rc2);
1448 break;
1449 }
1450
1451 ealen -= PAGE_CACHE_SIZE;
1452 }
1453
1454out:
1455 if (req)
1456 ptlrpc_req_finished(req);
1457
1458 if (rc == 0 && !eof)
1459 goto again;
1460
1461 if (pages) {
1462 for (i = 0; i < nrpages; i++) {
1463 if (pages[i] == NULL)
1464 break;
1465 __free_page(pages[i]);
1466 }
c9b4297f 1467 kfree(pages);
d7e09d03
PT
1468 }
1469 return rc;
1470}
1471
d7e09d03
PT
1472/* local_only means it cannot get remote llogs */
1473static int mgc_process_cfg_log(struct obd_device *mgc,
aa4e3c8a 1474 struct config_llog_data *cld, int local_only)
d7e09d03 1475{
0b79e161 1476 struct llog_ctxt *ctxt;
aa4e3c8a
MP
1477 struct lustre_sb_info *lsi = NULL;
1478 int rc = 0;
1479 bool sptlrpc_started = false;
1480 struct lu_env *env;
d7e09d03 1481
d7e09d03
PT
1482 LASSERT(cld);
1483 LASSERT(mutex_is_locked(&cld->cld_lock));
1484
1485 /*
1486 * local copy of sptlrpc log is controlled elsewhere, don't try to
1487 * read it up here.
1488 */
1489 if (cld_is_sptlrpc(cld) && local_only)
0a3bdb00 1490 return 0;
d7e09d03
PT
1491
1492 if (cld->cld_cfg.cfg_sb)
1493 lsi = s2lsi(cld->cld_cfg.cfg_sb);
1494
c9b4297f 1495 env = kzalloc(sizeof(*env), GFP_NOFS);
c829be81 1496 if (!env)
0a3bdb00 1497 return -ENOMEM;
d7e09d03 1498
aa4e3c8a
MP
1499 rc = lu_env_init(env, LCT_MG_THREAD);
1500 if (rc)
74d3ba98 1501 goto out_free;
aa4e3c8a
MP
1502
1503 ctxt = llog_get_context(mgc, LLOG_CONFIG_REPL_CTXT);
1504 LASSERT(ctxt);
1505
0b79e161
JH
1506 if (local_only) /* no local log at client side */ {
1507 rc = -EIO;
1508 goto out_pop;
d7e09d03
PT
1509 }
1510
1511 if (cld_is_sptlrpc(cld)) {
1512 sptlrpc_conf_log_update_begin(cld->cld_logname);
1513 sptlrpc_started = true;
1514 }
1515
1516 /* logname and instance info should be the same, so use our
aa4e3c8a
MP
1517 * copy of the instance for the update. The cfg_last_idx will
1518 * be updated here. */
1519 rc = class_config_parse_llog(env, ctxt, cld->cld_logname,
d7e09d03 1520 &cld->cld_cfg);
d7e09d03
PT
1521
1522out_pop:
aa4e3c8a 1523 __llog_ctxt_put(env, ctxt);
d7e09d03 1524
d7e09d03
PT
1525 /*
1526 * update settings on existing OBDs. doing it inside
1527 * of llog_process_lock so no device is attaching/detaching
1528 * in parallel.
1529 * the logname must be <fsname>-sptlrpc
1530 */
1531 if (sptlrpc_started) {
1532 LASSERT(cld_is_sptlrpc(cld));
1533 sptlrpc_conf_log_update_end(cld->cld_logname);
1534 class_notify_sptlrpc_conf(cld->cld_logname,
1535 strlen(cld->cld_logname) -
1536 strlen("-sptlrpc"));
1537 }
1538
aa4e3c8a
MP
1539 lu_env_fini(env);
1540out_free:
c9b4297f 1541 kfree(env);
0a3bdb00 1542 return rc;
d7e09d03
PT
1543}
1544
1545/** Get a config log from the MGS and process it.
1546 * This func is called for both clients and servers.
1547 * Copy the log locally before parsing it if appropriate (non-MGS server)
1548 */
1549int mgc_process_log(struct obd_device *mgc, struct config_llog_data *cld)
1550{
1551 struct lustre_handle lockh = { 0 };
1552 __u64 flags = LDLM_FL_NO_LRU;
1553 int rc = 0, rcl;
d7e09d03
PT
1554
1555 LASSERT(cld);
1556
1557 /* I don't want multiple processes running process_log at once --
1558 sounds like badness. It actually might be fine, as long as
1559 we're not trying to update from the same log
1560 simultaneously (in which case we should use a per-log sem.) */
1561 mutex_lock(&cld->cld_lock);
1562 if (cld->cld_stopping) {
1563 mutex_unlock(&cld->cld_lock);
0a3bdb00 1564 return 0;
d7e09d03
PT
1565 }
1566
1567 OBD_FAIL_TIMEOUT(OBD_FAIL_MGC_PAUSE_PROCESS_LOG, 20);
1568
1569 CDEBUG(D_MGC, "Process log %s:%p from %d\n", cld->cld_logname,
1570 cld->cld_cfg.cfg_instance, cld->cld_cfg.cfg_last_idx + 1);
1571
1572 /* Get the cfg lock on the llog */
1573 rcl = mgc_enqueue(mgc->u.cli.cl_mgc_mgsexp, NULL, LDLM_PLAIN, NULL,
1574 LCK_CR, &flags, NULL, NULL, NULL,
1575 cld, 0, NULL, &lockh);
1576 if (rcl == 0) {
1577 /* Get the cld, it will be released in mgc_blocking_ast. */
1578 config_log_get(cld);
1579 rc = ldlm_lock_set_data(&lockh, (void *)cld);
1580 LASSERT(rc == 0);
1581 } else {
1582 CDEBUG(D_MGC, "Can't get cfg lock: %d\n", rcl);
1583
1584 /* mark cld_lostlock so that it will requeue
1585 * after MGC becomes available. */
1586 cld->cld_lostlock = 1;
1587 /* Get extra reference, it will be put in requeue thread */
1588 config_log_get(cld);
1589 }
1590
1591
1592 if (cld_is_recover(cld)) {
1593 rc = 0; /* this is not a fatal error for recover log */
1594 if (rcl == 0)
1595 rc = mgc_process_recover_log(mgc, cld);
1596 } else {
1597 rc = mgc_process_cfg_log(mgc, cld, rcl != 0);
1598 }
1599
1600 CDEBUG(D_MGC, "%s: configuration from log '%s' %sed (%d).\n",
1601 mgc->obd_name, cld->cld_logname, rc ? "fail" : "succeed", rc);
1602
1603 mutex_unlock(&cld->cld_lock);
1604
1605 /* Now drop the lock so MGS can revoke it */
8d3d9848
JH
1606 if (!rcl)
1607 ldlm_lock_decref(&lockh, LCK_CR);
d7e09d03 1608
0a3bdb00 1609 return rc;
d7e09d03
PT
1610}
1611
1612
1613/** Called from lustre_process_log.
1614 * LCFG_LOG_START gets the config log from the MGS, processes it to start
1615 * any services, and adds it to the list logs to watch (follow).
1616 */
21aef7d9 1617static int mgc_process_config(struct obd_device *obd, u32 len, void *buf)
d7e09d03
PT
1618{
1619 struct lustre_cfg *lcfg = buf;
1620 struct config_llog_instance *cfg = NULL;
1621 char *logname;
1622 int rc = 0;
d7e09d03 1623
37821997 1624 switch (lcfg->lcfg_command) {
d7e09d03
PT
1625 case LCFG_LOV_ADD_OBD: {
1626 /* Overloading this cfg command: register a new target */
1627 struct mgs_target_info *mti;
1628
1629 if (LUSTRE_CFG_BUFLEN(lcfg, 1) !=
74d3ba98
JL
1630 sizeof(struct mgs_target_info)) {
1631 rc = -EINVAL;
1632 goto out;
1633 }
d7e09d03
PT
1634
1635 mti = (struct mgs_target_info *)lustre_cfg_buf(lcfg, 1);
1636 CDEBUG(D_MGC, "add_target %s %#x\n",
1637 mti->mti_svname, mti->mti_flags);
1638 rc = mgc_target_register(obd->u.cli.cl_mgc_mgsexp, mti);
1639 break;
1640 }
1641 case LCFG_LOV_DEL_OBD:
1642 /* Unregister has no meaning at the moment. */
1643 CERROR("lov_del_obd unimplemented\n");
1644 rc = -ENOSYS;
1645 break;
1646 case LCFG_SPTLRPC_CONF: {
1647 rc = sptlrpc_process_config(lcfg);
1648 break;
1649 }
1650 case LCFG_LOG_START: {
1651 struct config_llog_data *cld;
1652 struct super_block *sb;
1653
1654 logname = lustre_cfg_string(lcfg, 1);
1655 cfg = (struct config_llog_instance *)lustre_cfg_buf(lcfg, 2);
1656 sb = *(struct super_block **)lustre_cfg_buf(lcfg, 3);
1657
1658 CDEBUG(D_MGC, "parse_log %s from %d\n", logname,
1659 cfg->cfg_last_idx);
1660
1661 /* We're only called through here on the initial mount */
1662 rc = config_log_add(obd, logname, cfg, sb);
1663 if (rc)
1664 break;
1665 cld = config_log_find(logname, cfg);
1666 if (cld == NULL) {
1667 rc = -ENOENT;
1668 break;
1669 }
1670
1671 /* COMPAT_146 */
1672 /* FIXME only set this for old logs! Right now this forces
1673 us to always skip the "inside markers" check */
1674 cld->cld_cfg.cfg_flags |= CFG_F_COMPAT146;
1675
1676 rc = mgc_process_log(obd, cld);
1677 if (rc == 0 && cld->cld_recover != NULL) {
1678 if (OCD_HAS_FLAG(&obd->u.cli.cl_import->
1679 imp_connect_data, IMP_RECOV)) {
1680 rc = mgc_process_log(obd, cld->cld_recover);
1681 } else {
1682 struct config_llog_data *cir = cld->cld_recover;
1683 cld->cld_recover = NULL;
1684 config_log_put(cir);
1685 }
1686 if (rc)
1687 CERROR("Cannot process recover llog %d\n", rc);
1688 }
7d4bae45
AB
1689
1690 if (rc == 0 && cld->cld_params != NULL) {
1691 rc = mgc_process_log(obd, cld->cld_params);
1692 if (rc == -ENOENT) {
1693 CDEBUG(D_MGC,
1694 "There is no params config file yet\n");
1695 rc = 0;
1696 }
1697 /* params log is optional */
1698 if (rc)
1699 CERROR(
1700 "%s: can't process params llog: rc = %d\n",
1701 obd->obd_name, rc);
1702 }
d7e09d03
PT
1703 config_log_put(cld);
1704
1705 break;
1706 }
1707 case LCFG_LOG_END: {
1708 logname = lustre_cfg_string(lcfg, 1);
1709
1710 if (lcfg->lcfg_bufcount >= 2)
1711 cfg = (struct config_llog_instance *)lustre_cfg_buf(
1712 lcfg, 2);
1713 rc = config_log_end(logname, cfg);
1714 break;
1715 }
1716 default: {
1717 CERROR("Unknown command: %d\n", lcfg->lcfg_command);
74d3ba98
JL
1718 rc = -EINVAL;
1719 goto out;
d7e09d03
PT
1720
1721 }
1722 }
1723out:
0a3bdb00 1724 return rc;
d7e09d03
PT
1725}
1726
1727struct obd_ops mgc_obd_ops = {
1728 .o_owner = THIS_MODULE,
1729 .o_setup = mgc_setup,
1730 .o_precleanup = mgc_precleanup,
1731 .o_cleanup = mgc_cleanup,
1732 .o_add_conn = client_import_add_conn,
1733 .o_del_conn = client_import_del_conn,
1734 .o_connect = client_connect_import,
1735 .o_disconnect = client_disconnect_export,
57c4b127 1736 /* .o_enqueue = mgc_enqueue, */
57c4b127 1737 /* .o_iocontrol = mgc_iocontrol, */
d7e09d03
PT
1738 .o_set_info_async = mgc_set_info_async,
1739 .o_get_info = mgc_get_info,
1740 .o_import_event = mgc_import_event,
d7e09d03
PT
1741 .o_process_config = mgc_process_config,
1742};
1743
0bf31f07 1744static int __init mgc_init(void)
d7e09d03 1745{
2962b440 1746 return class_register_type(&mgc_obd_ops, NULL,
d7e09d03
PT
1747 LUSTRE_MGC_NAME, NULL);
1748}
1749
1750static void /*__exit*/ mgc_exit(void)
1751{
1752 class_unregister_type(LUSTRE_MGC_NAME);
1753}
1754
1755MODULE_AUTHOR("Sun Microsystems, Inc. <http://www.lustre.org/>");
1756MODULE_DESCRIPTION("Lustre Management Client");
1757MODULE_LICENSE("GPL");
1758
1759module_init(mgc_init);
1760module_exit(mgc_exit);