staging: lustre: Coalesce string fragments
[linux-2.6-block.git] / drivers / staging / lustre / lustre / ptlrpc / ptlrpcd.c
CommitLineData
d7e09d03
PT
1/*
2 * GPL HEADER START
3 *
4 * DO NOT ALTER OR REMOVE COPYRIGHT NOTICES OR THIS FILE HEADER.
5 *
6 * This program is free software; you can redistribute it and/or modify
7 * it under the terms of the GNU General Public License version 2 only,
8 * as published by the Free Software Foundation.
9 *
10 * This program is distributed in the hope that it will be useful, but
11 * WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
13 * General Public License version 2 for more details (a copy is included
14 * in the LICENSE file that accompanied this code).
15 *
16 * You should have received a copy of the GNU General Public License
17 * version 2 along with this program; If not, see
18 * http://www.sun.com/software/products/lustre/docs/GPLv2.pdf
19 *
20 * Please contact Sun Microsystems, Inc., 4150 Network Circle, Santa Clara,
21 * CA 95054 USA or visit www.sun.com if you need additional information or
22 * have any questions.
23 *
24 * GPL HEADER END
25 */
26/*
27 * Copyright (c) 2003, 2010, Oracle and/or its affiliates. All rights reserved.
28 * Use is subject to license terms.
29 *
30 * Copyright (c) 2011, 2012, Intel Corporation.
31 */
32/*
33 * This file is part of Lustre, http://www.lustre.org/
34 * Lustre is a trademark of Sun Microsystems, Inc.
35 *
36 * lustre/ptlrpc/ptlrpcd.c
37 */
38
39/** \defgroup ptlrpcd PortalRPC daemon
40 *
41 * ptlrpcd is a special thread with its own set where other user might add
42 * requests when they don't want to wait for their completion.
43 * PtlRPCD will take care of sending such requests and then processing their
44 * replies and calling completion callbacks as necessary.
45 * The callbacks are called directly from ptlrpcd context.
46 * It is important to never significantly block (esp. on RPCs!) within such
47 * completion handler or a deadlock might occur where ptlrpcd enters some
48 * callback that attempts to send another RPC and wait for it to return,
49 * during which time ptlrpcd is completely blocked, so e.g. if import
50 * fails, recovery cannot progress because connection requests are also
51 * sent by ptlrpcd.
52 *
53 * @{
54 */
55
56#define DEBUG_SUBSYSTEM S_RPC
57
9fdaf8c0 58#include "../../include/linux/libcfs/libcfs.h"
d7e09d03 59
e27db149
GKH
60#include "../include/lustre_net.h"
61#include "../include/lustre_lib.h"
62#include "../include/lustre_ha.h"
63#include "../include/obd_class.h" /* for obd_zombie */
64#include "../include/obd_support.h" /* for OBD_FAIL_CHECK */
65#include "../include/cl_object.h" /* cl_env_{get,put}() */
66#include "../include/lprocfs_status.h"
d7e09d03
PT
67
68#include "ptlrpc_internal.h"
69
70struct ptlrpcd {
71 int pd_size;
72 int pd_index;
73 int pd_nthreads;
74 struct ptlrpcd_ctl pd_thread_rcv;
75 struct ptlrpcd_ctl pd_threads[0];
76};
77
78static int max_ptlrpcds;
8cc7b4b9
PT
79module_param(max_ptlrpcds, int, 0644);
80MODULE_PARM_DESC(max_ptlrpcds, "Max ptlrpcd thread count to be started.");
d7e09d03
PT
81
82static int ptlrpcd_bind_policy = PDB_POLICY_PAIR;
8cc7b4b9
PT
83module_param(ptlrpcd_bind_policy, int, 0644);
84MODULE_PARM_DESC(ptlrpcd_bind_policy, "Ptlrpcd threads binding mode.");
d7e09d03
PT
85static struct ptlrpcd *ptlrpcds;
86
87struct mutex ptlrpcd_mutex;
88static int ptlrpcd_users = 0;
89
90void ptlrpcd_wake(struct ptlrpc_request *req)
91{
92 struct ptlrpc_request_set *rq_set = req->rq_set;
93
94 LASSERT(rq_set != NULL);
95
96 wake_up(&rq_set->set_waitq);
97}
98EXPORT_SYMBOL(ptlrpcd_wake);
99
100static struct ptlrpcd_ctl *
101ptlrpcd_select_pc(struct ptlrpc_request *req, pdl_policy_t policy, int index)
102{
103 int idx = 0;
104
105 if (req != NULL && req->rq_send_state != LUSTRE_IMP_FULL)
106 return &ptlrpcds->pd_thread_rcv;
107
108 switch (policy) {
109 case PDL_POLICY_SAME:
110 idx = smp_processor_id() % ptlrpcds->pd_nthreads;
111 break;
112 case PDL_POLICY_LOCAL:
113 /* Before CPU partition patches available, process it the same
114 * as "PDL_POLICY_ROUND". */
115# ifdef CFS_CPU_MODE_NUMA
116# warning "fix this code to use new CPU partition APIs"
117# endif
118 /* Fall through to PDL_POLICY_ROUND until the CPU
119 * CPU partition patches are available. */
120 index = -1;
121 case PDL_POLICY_PREFERRED:
122 if (index >= 0 && index < num_online_cpus()) {
123 idx = index % ptlrpcds->pd_nthreads;
124 break;
125 }
126 /* Fall through to PDL_POLICY_ROUND for bad index. */
127 default:
128 /* Fall through to PDL_POLICY_ROUND for unknown policy. */
129 case PDL_POLICY_ROUND:
130 /* We do not care whether it is strict load balance. */
131 idx = ptlrpcds->pd_index + 1;
132 if (idx == smp_processor_id())
133 idx++;
134 idx %= ptlrpcds->pd_nthreads;
135 ptlrpcds->pd_index = idx;
136 break;
137 }
138
139 return &ptlrpcds->pd_threads[idx];
140}
141
142/**
143 * Move all request from an existing request set to the ptlrpcd queue.
144 * All requests from the set must be in phase RQ_PHASE_NEW.
145 */
146void ptlrpcd_add_rqset(struct ptlrpc_request_set *set)
147{
148 struct list_head *tmp, *pos;
149 struct ptlrpcd_ctl *pc;
150 struct ptlrpc_request_set *new;
151 int count, i;
152
153 pc = ptlrpcd_select_pc(NULL, PDL_POLICY_LOCAL, -1);
154 new = pc->pc_set;
155
156 list_for_each_safe(pos, tmp, &set->set_requests) {
157 struct ptlrpc_request *req =
158 list_entry(pos, struct ptlrpc_request,
159 rq_set_chain);
160
161 LASSERT(req->rq_phase == RQ_PHASE_NEW);
162 req->rq_set = new;
163 req->rq_queued_time = cfs_time_current();
164 }
165
166 spin_lock(&new->set_new_req_lock);
167 list_splice_init(&set->set_requests, &new->set_new_requests);
168 i = atomic_read(&set->set_remaining);
169 count = atomic_add_return(i, &new->set_new_count);
170 atomic_set(&set->set_remaining, 0);
171 spin_unlock(&new->set_new_req_lock);
172 if (count == i) {
173 wake_up(&new->set_waitq);
174
175 /* XXX: It maybe unnecessary to wakeup all the partners. But to
176 * guarantee the async RPC can be processed ASAP, we have
177 * no other better choice. It maybe fixed in future. */
178 for (i = 0; i < pc->pc_npartners; i++)
179 wake_up(&pc->pc_partners[i]->pc_set->set_waitq);
180 }
181}
182EXPORT_SYMBOL(ptlrpcd_add_rqset);
183
184/**
185 * Return transferred RPCs count.
186 */
187static int ptlrpcd_steal_rqset(struct ptlrpc_request_set *des,
188 struct ptlrpc_request_set *src)
189{
190 struct list_head *tmp, *pos;
191 struct ptlrpc_request *req;
192 int rc = 0;
193
194 spin_lock(&src->set_new_req_lock);
195 if (likely(!list_empty(&src->set_new_requests))) {
196 list_for_each_safe(pos, tmp, &src->set_new_requests) {
197 req = list_entry(pos, struct ptlrpc_request,
198 rq_set_chain);
199 req->rq_set = des;
200 }
201 list_splice_init(&src->set_new_requests,
202 &des->set_requests);
203 rc = atomic_read(&src->set_new_count);
204 atomic_add(rc, &des->set_remaining);
205 atomic_set(&src->set_new_count, 0);
206 }
207 spin_unlock(&src->set_new_req_lock);
208 return rc;
209}
210
211/**
212 * Requests that are added to the ptlrpcd queue are sent via
213 * ptlrpcd_check->ptlrpc_check_set().
214 */
215void ptlrpcd_add_req(struct ptlrpc_request *req, pdl_policy_t policy, int idx)
216{
217 struct ptlrpcd_ctl *pc;
218
219 if (req->rq_reqmsg)
220 lustre_msg_set_jobid(req->rq_reqmsg, NULL);
221
222 spin_lock(&req->rq_lock);
223 if (req->rq_invalid_rqset) {
224 struct l_wait_info lwi = LWI_TIMEOUT(cfs_time_seconds(5),
225 back_to_sleep, NULL);
226
227 req->rq_invalid_rqset = 0;
228 spin_unlock(&req->rq_lock);
229 l_wait_event(req->rq_set_waitq, (req->rq_set == NULL), &lwi);
230 } else if (req->rq_set) {
b6da17f3 231 /* If we have a valid "rq_set", just reuse it to avoid double
d7e09d03
PT
232 * linked. */
233 LASSERT(req->rq_phase == RQ_PHASE_NEW);
234 LASSERT(req->rq_send_state == LUSTRE_IMP_REPLAY);
235
236 /* ptlrpc_check_set will decrease the count */
237 atomic_inc(&req->rq_set->set_remaining);
238 spin_unlock(&req->rq_lock);
239 wake_up(&req->rq_set->set_waitq);
240 return;
241 } else {
242 spin_unlock(&req->rq_lock);
243 }
244
245 pc = ptlrpcd_select_pc(req, policy, idx);
246
247 DEBUG_REQ(D_INFO, req, "add req [%p] to pc [%s:%d]",
248 req, pc->pc_name, pc->pc_index);
249
250 ptlrpc_set_add_new_req(pc, req);
251}
252EXPORT_SYMBOL(ptlrpcd_add_req);
253
254static inline void ptlrpc_reqset_get(struct ptlrpc_request_set *set)
255{
256 atomic_inc(&set->set_refcount);
257}
258
259/**
260 * Check if there is more work to do on ptlrpcd set.
261 * Returns 1 if yes.
262 */
263static int ptlrpcd_check(struct lu_env *env, struct ptlrpcd_ctl *pc)
264{
265 struct list_head *tmp, *pos;
266 struct ptlrpc_request *req;
267 struct ptlrpc_request_set *set = pc->pc_set;
268 int rc = 0;
269 int rc2;
d7e09d03
PT
270
271 if (atomic_read(&set->set_new_count)) {
272 spin_lock(&set->set_new_req_lock);
273 if (likely(!list_empty(&set->set_new_requests))) {
274 list_splice_init(&set->set_new_requests,
275 &set->set_requests);
276 atomic_add(atomic_read(&set->set_new_count),
277 &set->set_remaining);
278 atomic_set(&set->set_new_count, 0);
279 /*
280 * Need to calculate its timeout.
281 */
282 rc = 1;
283 }
284 spin_unlock(&set->set_new_req_lock);
285 }
286
287 /* We should call lu_env_refill() before handling new requests to make
288 * sure that env key the requests depending on really exists.
289 */
290 rc2 = lu_env_refill(env);
291 if (rc2 != 0) {
292 /*
293 * XXX This is very awkward situation, because
294 * execution can neither continue (request
295 * interpreters assume that env is set up), nor repeat
296 * the loop (as this potentially results in a tight
297 * loop of -ENOMEM's).
298 *
299 * Fortunately, refill only ever does something when
300 * new modules are loaded, i.e., early during boot up.
301 */
302 CERROR("Failure to refill session: %d\n", rc2);
0a3bdb00 303 return rc;
d7e09d03
PT
304 }
305
306 if (atomic_read(&set->set_remaining))
307 rc |= ptlrpc_check_set(env, set);
308
309 if (!list_empty(&set->set_requests)) {
310 /*
311 * XXX: our set never completes, so we prune the completed
312 * reqs after each iteration. boy could this be smarter.
313 */
314 list_for_each_safe(pos, tmp, &set->set_requests) {
315 req = list_entry(pos, struct ptlrpc_request,
316 rq_set_chain);
317 if (req->rq_phase != RQ_PHASE_COMPLETE)
318 continue;
319
320 list_del_init(&req->rq_set_chain);
321 req->rq_set = NULL;
322 ptlrpc_req_finished(req);
323 }
324 }
325
326 if (rc == 0) {
327 /*
328 * If new requests have been added, make sure to wake up.
329 */
330 rc = atomic_read(&set->set_new_count);
331
332 /* If we have nothing to do, check whether we can take some
333 * work from our partner threads. */
334 if (rc == 0 && pc->pc_npartners > 0) {
335 struct ptlrpcd_ctl *partner;
336 struct ptlrpc_request_set *ps;
337 int first = pc->pc_cursor;
338
339 do {
340 partner = pc->pc_partners[pc->pc_cursor++];
341 if (pc->pc_cursor >= pc->pc_npartners)
342 pc->pc_cursor = 0;
343 if (partner == NULL)
344 continue;
345
346 spin_lock(&partner->pc_lock);
347 ps = partner->pc_set;
348 if (ps == NULL) {
349 spin_unlock(&partner->pc_lock);
350 continue;
351 }
352
353 ptlrpc_reqset_get(ps);
354 spin_unlock(&partner->pc_lock);
355
356 if (atomic_read(&ps->set_new_count)) {
357 rc = ptlrpcd_steal_rqset(set, ps);
358 if (rc > 0)
2d00bd17
JP
359 CDEBUG(D_RPCTRACE, "transfer %d async RPCs [%d->%d]\n",
360 rc, partner->pc_index,
361 pc->pc_index);
d7e09d03
PT
362 }
363 ptlrpc_reqset_put(ps);
364 } while (rc == 0 && pc->pc_cursor != first);
365 }
366 }
367
0a3bdb00 368 return rc;
d7e09d03
PT
369}
370
371/**
372 * Main ptlrpcd thread.
373 * ptlrpc's code paths like to execute in process context, so we have this
374 * thread which spins on a set which contains the rpcs and sends them.
375 *
376 */
377static int ptlrpcd(void *arg)
378{
379 struct ptlrpcd_ctl *pc = arg;
380 struct ptlrpc_request_set *set = pc->pc_set;
381 struct lu_env env = { .le_ses = NULL };
382 int rc, exit = 0;
d7e09d03
PT
383
384 unshare_fs_struct();
385#if defined(CONFIG_SMP)
386 if (test_bit(LIOD_BIND, &pc->pc_flags)) {
387 int index = pc->pc_index;
388
389 if (index >= 0 && index < num_possible_cpus()) {
390 while (!cpu_online(index)) {
391 if (++index >= num_possible_cpus())
392 index = 0;
393 }
32654b67
PT
394 set_cpus_allowed_ptr(current,
395 cpumask_of_node(cpu_to_node(index)));
d7e09d03
PT
396 }
397 }
398#endif
399 /*
400 * XXX So far only "client" ptlrpcd uses an environment. In
401 * the future, ptlrpcd thread (or a thread-set) has to given
402 * an argument, describing its "scope".
403 */
404 rc = lu_context_init(&env.le_ctx,
405 LCT_CL_THREAD|LCT_REMEMBER|LCT_NOREF);
406 complete(&pc->pc_starting);
407
408 if (rc != 0)
0a3bdb00 409 return rc;
d7e09d03
PT
410
411 /*
412 * This mainloop strongly resembles ptlrpc_set_wait() except that our
413 * set never completes. ptlrpcd_check() calls ptlrpc_check_set() when
414 * there are requests in the set. New requests come in on the set's
415 * new_req_list and ptlrpcd_check() moves them into the set.
416 */
417 do {
418 struct l_wait_info lwi;
419 int timeout;
420
421 timeout = ptlrpc_set_next_timeout(set);
422 lwi = LWI_TIMEOUT(cfs_time_seconds(timeout ? timeout : 1),
423 ptlrpc_expired_set, set);
424
425 lu_context_enter(&env.le_ctx);
426 l_wait_event(set->set_waitq,
427 ptlrpcd_check(&env, pc), &lwi);
428 lu_context_exit(&env.le_ctx);
429
430 /*
431 * Abort inflight rpcs for forced stop case.
432 */
433 if (test_bit(LIOD_STOP, &pc->pc_flags)) {
434 if (test_bit(LIOD_FORCE, &pc->pc_flags))
435 ptlrpc_abort_set(set);
436 exit++;
437 }
438
439 /*
440 * Let's make one more loop to make sure that ptlrpcd_check()
441 * copied all raced new rpcs into the set so we can kill them.
442 */
443 } while (exit < 2);
444
445 /*
446 * Wait for inflight requests to drain.
447 */
448 if (!list_empty(&set->set_requests))
449 ptlrpc_set_wait(set);
450 lu_context_fini(&env.le_ctx);
451
452 complete(&pc->pc_finishing);
453
454 return 0;
455}
456
457/* XXX: We want multiple CPU cores to share the async RPC load. So we start many
458 * ptlrpcd threads. We also want to reduce the ptlrpcd overhead caused by
459 * data transfer cross-CPU cores. So we bind ptlrpcd thread to specified
460 * CPU core. But binding all ptlrpcd threads maybe cause response delay
461 * because of some CPU core(s) busy with other loads.
462 *
463 * For example: "ls -l", some async RPCs for statahead are assigned to
464 * ptlrpcd_0, and ptlrpcd_0 is bound to CPU_0, but CPU_0 may be quite busy
465 * with other non-ptlrpcd, like "ls -l" itself (we want to the "ls -l"
466 * thread, statahead thread, and ptlrpcd thread can run in parallel), under
467 * such case, the statahead async RPCs can not be processed in time, it is
468 * unexpected. If ptlrpcd_0 can be re-scheduled on other CPU core, it may
469 * be better. But it breaks former data transfer policy.
470 *
471 * So we shouldn't be blind for avoiding the data transfer. We make some
b6da17f3 472 * compromise: divide the ptlrpcd threads pool into two parts. One part is
d7e09d03
PT
473 * for bound mode, each ptlrpcd thread in this part is bound to some CPU
474 * core. The other part is for free mode, all the ptlrpcd threads in the
475 * part can be scheduled on any CPU core. We specify some partnership
476 * between bound mode ptlrpcd thread(s) and free mode ptlrpcd thread(s),
477 * and the async RPC load within the partners are shared.
478 *
479 * It can partly avoid data transfer cross-CPU (if the bound mode ptlrpcd
480 * thread can be scheduled in time), and try to guarantee the async RPC
481 * processed ASAP (as long as the free mode ptlrpcd thread can be scheduled
482 * on any CPU core).
483 *
484 * As for how to specify the partnership between bound mode ptlrpcd
485 * thread(s) and free mode ptlrpcd thread(s), the simplest way is to use
486 * <free bound> pair. In future, we can specify some more complex
487 * partnership based on the patches for CPU partition. But before such
488 * patches are available, we prefer to use the simplest one.
489 */
490# ifdef CFS_CPU_MODE_NUMA
491# warning "fix ptlrpcd_bind() to use new CPU partition APIs"
492# endif
493static int ptlrpcd_bind(int index, int max)
494{
495 struct ptlrpcd_ctl *pc;
496 int rc = 0;
497#if defined(CONFIG_NUMA)
498 cpumask_t mask;
499#endif
d7e09d03
PT
500
501 LASSERT(index <= max - 1);
502 pc = &ptlrpcds->pd_threads[index];
503 switch (ptlrpcd_bind_policy) {
504 case PDB_POLICY_NONE:
505 pc->pc_npartners = -1;
506 break;
507 case PDB_POLICY_FULL:
508 pc->pc_npartners = 0;
509 set_bit(LIOD_BIND, &pc->pc_flags);
510 break;
511 case PDB_POLICY_PAIR:
512 LASSERT(max % 2 == 0);
513 pc->pc_npartners = 1;
514 break;
515 case PDB_POLICY_NEIGHBOR:
516#if defined(CONFIG_NUMA)
517 {
518 int i;
519 mask = *cpumask_of_node(cpu_to_node(index));
520 for (i = max; i < num_online_cpus(); i++)
521 cpu_clear(i, mask);
522 pc->pc_npartners = cpus_weight(mask) - 1;
523 set_bit(LIOD_BIND, &pc->pc_flags);
524 }
525#else
526 LASSERT(max >= 3);
527 pc->pc_npartners = 2;
528#endif
529 break;
530 default:
531 CERROR("unknown ptlrpcd bind policy %d\n", ptlrpcd_bind_policy);
532 rc = -EINVAL;
533 }
534
535 if (rc == 0 && pc->pc_npartners > 0) {
536 OBD_ALLOC(pc->pc_partners,
537 sizeof(struct ptlrpcd_ctl *) * pc->pc_npartners);
538 if (pc->pc_partners == NULL) {
539 pc->pc_npartners = 0;
540 rc = -ENOMEM;
541 } else {
542 switch (ptlrpcd_bind_policy) {
543 case PDB_POLICY_PAIR:
544 if (index & 0x1) {
545 set_bit(LIOD_BIND, &pc->pc_flags);
546 pc->pc_partners[0] = &ptlrpcds->
547 pd_threads[index - 1];
548 ptlrpcds->pd_threads[index - 1].
549 pc_partners[0] = pc;
550 }
551 break;
552 case PDB_POLICY_NEIGHBOR:
553#if defined(CONFIG_NUMA)
554 {
555 struct ptlrpcd_ctl *ppc;
556 int i, pidx;
557 /* partners are cores in the same NUMA node.
558 * setup partnership only with ptlrpcd threads
559 * that are already initialized
560 */
561 for (pidx = 0, i = 0; i < index; i++) {
562 if (cpu_isset(i, mask)) {
563 ppc = &ptlrpcds->pd_threads[i];
564 pc->pc_partners[pidx++] = ppc;
565 ppc->pc_partners[ppc->
566 pc_npartners++] = pc;
567 }
568 }
569 /* adjust number of partners to the number
570 * of partnership really setup */
571 pc->pc_npartners = pidx;
572 }
573#else
574 if (index & 0x1)
575 set_bit(LIOD_BIND, &pc->pc_flags);
576 if (index > 0) {
577 pc->pc_partners[0] = &ptlrpcds->
578 pd_threads[index - 1];
579 ptlrpcds->pd_threads[index - 1].
580 pc_partners[1] = pc;
581 if (index == max - 1) {
582 pc->pc_partners[1] =
583 &ptlrpcds->pd_threads[0];
584 ptlrpcds->pd_threads[0].
585 pc_partners[0] = pc;
586 }
587 }
588#endif
589 break;
590 }
591 }
592 }
593
0a3bdb00 594 return rc;
d7e09d03
PT
595}
596
597
598int ptlrpcd_start(int index, int max, const char *name, struct ptlrpcd_ctl *pc)
599{
600 int rc;
d7e09d03
PT
601
602 /*
603 * Do not allow start second thread for one pc.
604 */
605 if (test_and_set_bit(LIOD_START, &pc->pc_flags)) {
606 CWARN("Starting second thread (%s) for same pc %p\n",
607 name, pc);
0a3bdb00 608 return 0;
d7e09d03
PT
609 }
610
611 pc->pc_index = index;
612 init_completion(&pc->pc_starting);
613 init_completion(&pc->pc_finishing);
614 spin_lock_init(&pc->pc_lock);
9edf0f67 615 strlcpy(pc->pc_name, name, sizeof(pc->pc_name));
d7e09d03 616 pc->pc_set = ptlrpc_prep_set();
a9b3e8f3
JL
617 if (pc->pc_set == NULL) {
618 rc = -ENOMEM;
619 goto out;
620 }
87c7d315 621
d7e09d03
PT
622 /*
623 * So far only "client" ptlrpcd uses an environment. In the future,
624 * ptlrpcd thread (or a thread-set) has to be given an argument,
625 * describing its "scope".
626 */
627 rc = lu_context_init(&pc->pc_env.le_ctx, LCT_CL_THREAD|LCT_REMEMBER);
628 if (rc != 0)
a9b3e8f3 629 goto out_set;
d7e09d03 630
d7e09d03 631 {
68b636b6 632 struct task_struct *task;
d7e09d03
PT
633 if (index >= 0) {
634 rc = ptlrpcd_bind(index, max);
635 if (rc < 0)
a9b3e8f3 636 goto out_env;
d7e09d03
PT
637 }
638
877494a7 639 task = kthread_run(ptlrpcd, pc, "%s", pc->pc_name);
a9b3e8f3
JL
640 if (IS_ERR(task)) {
641 rc = PTR_ERR(task);
642 goto out_env;
643 }
d7e09d03 644
d7e09d03
PT
645 wait_for_completion(&pc->pc_starting);
646 }
87c7d315
DE
647 return 0;
648
649out_env:
650 lu_context_fini(&pc->pc_env.le_ctx);
651
652out_set:
653 if (pc->pc_set != NULL) {
654 struct ptlrpc_request_set *set = pc->pc_set;
655
656 spin_lock(&pc->pc_lock);
657 pc->pc_set = NULL;
658 spin_unlock(&pc->pc_lock);
659 ptlrpc_set_destroy(set);
d7e09d03 660 }
87c7d315
DE
661 clear_bit(LIOD_BIND, &pc->pc_flags);
662
663out:
664 clear_bit(LIOD_START, &pc->pc_flags);
0a3bdb00 665 return rc;
d7e09d03
PT
666}
667
668void ptlrpcd_stop(struct ptlrpcd_ctl *pc, int force)
669{
d7e09d03
PT
670 if (!test_bit(LIOD_START, &pc->pc_flags)) {
671 CWARN("Thread for pc %p was not started\n", pc);
23f14e79 672 return;
d7e09d03
PT
673 }
674
675 set_bit(LIOD_STOP, &pc->pc_flags);
676 if (force)
677 set_bit(LIOD_FORCE, &pc->pc_flags);
678 wake_up(&pc->pc_set->set_waitq);
d7e09d03
PT
679}
680
681void ptlrpcd_free(struct ptlrpcd_ctl *pc)
682{
683 struct ptlrpc_request_set *set = pc->pc_set;
d7e09d03
PT
684
685 if (!test_bit(LIOD_START, &pc->pc_flags)) {
686 CWARN("Thread for pc %p was not started\n", pc);
687 goto out;
688 }
689
690 wait_for_completion(&pc->pc_finishing);
691 lu_context_fini(&pc->pc_env.le_ctx);
692
693 spin_lock(&pc->pc_lock);
694 pc->pc_set = NULL;
695 spin_unlock(&pc->pc_lock);
696 ptlrpc_set_destroy(set);
697
698 clear_bit(LIOD_START, &pc->pc_flags);
699 clear_bit(LIOD_STOP, &pc->pc_flags);
700 clear_bit(LIOD_FORCE, &pc->pc_flags);
701 clear_bit(LIOD_BIND, &pc->pc_flags);
702
703out:
704 if (pc->pc_npartners > 0) {
705 LASSERT(pc->pc_partners != NULL);
706
707 OBD_FREE(pc->pc_partners,
708 sizeof(struct ptlrpcd_ctl *) * pc->pc_npartners);
709 pc->pc_partners = NULL;
710 }
711 pc->pc_npartners = 0;
d7e09d03
PT
712}
713
714static void ptlrpcd_fini(void)
715{
716 int i;
d7e09d03
PT
717
718 if (ptlrpcds != NULL) {
719 for (i = 0; i < ptlrpcds->pd_nthreads; i++)
720 ptlrpcd_stop(&ptlrpcds->pd_threads[i], 0);
721 for (i = 0; i < ptlrpcds->pd_nthreads; i++)
722 ptlrpcd_free(&ptlrpcds->pd_threads[i]);
723 ptlrpcd_stop(&ptlrpcds->pd_thread_rcv, 0);
724 ptlrpcd_free(&ptlrpcds->pd_thread_rcv);
725 OBD_FREE(ptlrpcds, ptlrpcds->pd_size);
726 ptlrpcds = NULL;
727 }
d7e09d03
PT
728}
729
730static int ptlrpcd_init(void)
731{
732 int nthreads = num_online_cpus();
733 char name[16];
734 int size, i = -1, j, rc = 0;
d7e09d03
PT
735
736 if (max_ptlrpcds > 0 && max_ptlrpcds < nthreads)
737 nthreads = max_ptlrpcds;
738 if (nthreads < 2)
739 nthreads = 2;
740 if (nthreads < 3 && ptlrpcd_bind_policy == PDB_POLICY_NEIGHBOR)
741 ptlrpcd_bind_policy = PDB_POLICY_PAIR;
742 else if (nthreads % 2 != 0 && ptlrpcd_bind_policy == PDB_POLICY_PAIR)
743 nthreads &= ~1; /* make sure it is even */
744
745 size = offsetof(struct ptlrpcd, pd_threads[nthreads]);
746 OBD_ALLOC(ptlrpcds, size);
a9b3e8f3
JL
747 if (ptlrpcds == NULL) {
748 rc = -ENOMEM;
749 goto out;
750 }
d7e09d03 751
9edf0f67 752 snprintf(name, sizeof(name), "ptlrpcd_rcv");
d7e09d03
PT
753 set_bit(LIOD_RECOVERY, &ptlrpcds->pd_thread_rcv.pc_flags);
754 rc = ptlrpcd_start(-1, nthreads, name, &ptlrpcds->pd_thread_rcv);
755 if (rc < 0)
a9b3e8f3 756 goto out;
d7e09d03
PT
757
758 /* XXX: We start nthreads ptlrpc daemons. Each of them can process any
759 * non-recovery async RPC to improve overall async RPC efficiency.
760 *
761 * But there are some issues with async I/O RPCs and async non-I/O
762 * RPCs processed in the same set under some cases. The ptlrpcd may
763 * be blocked by some async I/O RPC(s), then will cause other async
764 * non-I/O RPC(s) can not be processed in time.
765 *
766 * Maybe we should distinguish blocked async RPCs from non-blocked
767 * async RPCs, and process them in different ptlrpcd sets to avoid
768 * unnecessary dependency. But how to distribute async RPCs load
769 * among all the ptlrpc daemons becomes another trouble. */
770 for (i = 0; i < nthreads; i++) {
9edf0f67 771 snprintf(name, sizeof(name), "ptlrpcd_%d", i);
d7e09d03
PT
772 rc = ptlrpcd_start(i, nthreads, name, &ptlrpcds->pd_threads[i]);
773 if (rc < 0)
a9b3e8f3 774 goto out;
d7e09d03
PT
775 }
776
777 ptlrpcds->pd_size = size;
778 ptlrpcds->pd_index = 0;
779 ptlrpcds->pd_nthreads = nthreads;
780
781out:
782 if (rc != 0 && ptlrpcds != NULL) {
783 for (j = 0; j <= i; j++)
784 ptlrpcd_stop(&ptlrpcds->pd_threads[j], 0);
785 for (j = 0; j <= i; j++)
786 ptlrpcd_free(&ptlrpcds->pd_threads[j]);
787 ptlrpcd_stop(&ptlrpcds->pd_thread_rcv, 0);
788 ptlrpcd_free(&ptlrpcds->pd_thread_rcv);
789 OBD_FREE(ptlrpcds, size);
790 ptlrpcds = NULL;
791 }
792
0a3bdb00 793 return 0;
d7e09d03
PT
794}
795
796int ptlrpcd_addref(void)
797{
798 int rc = 0;
d7e09d03
PT
799
800 mutex_lock(&ptlrpcd_mutex);
801 if (++ptlrpcd_users == 1)
802 rc = ptlrpcd_init();
803 mutex_unlock(&ptlrpcd_mutex);
0a3bdb00 804 return rc;
d7e09d03
PT
805}
806EXPORT_SYMBOL(ptlrpcd_addref);
807
808void ptlrpcd_decref(void)
809{
810 mutex_lock(&ptlrpcd_mutex);
811 if (--ptlrpcd_users == 0)
812 ptlrpcd_fini();
813 mutex_unlock(&ptlrpcd_mutex);
814}
815EXPORT_SYMBOL(ptlrpcd_decref);
816/** @} ptlrpcd */