workqueue: reimplement WQ_HIGHPRI using a separate worker_pool
[linux-2.6-block.git] / kernel / workqueue.c
1 /*
2  * kernel/workqueue.c - generic async execution with shared worker pool
3  *
4  * Copyright (C) 2002           Ingo Molnar
5  *
6  *   Derived from the taskqueue/keventd code by:
7  *     David Woodhouse <dwmw2@infradead.org>
8  *     Andrew Morton
9  *     Kai Petzke <wpp@marie.physik.tu-berlin.de>
10  *     Theodore Ts'o <tytso@mit.edu>
11  *
12  * Made to use alloc_percpu by Christoph Lameter.
13  *
14  * Copyright (C) 2010           SUSE Linux Products GmbH
15  * Copyright (C) 2010           Tejun Heo <tj@kernel.org>
16  *
17  * This is the generic async execution mechanism.  Work items as are
18  * executed in process context.  The worker pool is shared and
19  * automatically managed.  There is one worker pool for each CPU and
20  * one extra for works which are better served by workers which are
21  * not bound to any specific CPU.
22  *
23  * Please read Documentation/workqueue.txt for details.
24  */
25
26 #include <linux/export.h>
27 #include <linux/kernel.h>
28 #include <linux/sched.h>
29 #include <linux/init.h>
30 #include <linux/signal.h>
31 #include <linux/completion.h>
32 #include <linux/workqueue.h>
33 #include <linux/slab.h>
34 #include <linux/cpu.h>
35 #include <linux/notifier.h>
36 #include <linux/kthread.h>
37 #include <linux/hardirq.h>
38 #include <linux/mempolicy.h>
39 #include <linux/freezer.h>
40 #include <linux/kallsyms.h>
41 #include <linux/debug_locks.h>
42 #include <linux/lockdep.h>
43 #include <linux/idr.h>
44
45 #include "workqueue_sched.h"
46
47 enum {
48         /* global_cwq flags */
49         GCWQ_DISASSOCIATED      = 1 << 0,       /* cpu can't serve workers */
50         GCWQ_FREEZING           = 1 << 1,       /* freeze in progress */
51
52         /* pool flags */
53         POOL_MANAGE_WORKERS     = 1 << 0,       /* need to manage workers */
54         POOL_MANAGING_WORKERS   = 1 << 1,       /* managing workers */
55
56         /* worker flags */
57         WORKER_STARTED          = 1 << 0,       /* started */
58         WORKER_DIE              = 1 << 1,       /* die die die */
59         WORKER_IDLE             = 1 << 2,       /* is idle */
60         WORKER_PREP             = 1 << 3,       /* preparing to run works */
61         WORKER_ROGUE            = 1 << 4,       /* not bound to any cpu */
62         WORKER_REBIND           = 1 << 5,       /* mom is home, come back */
63         WORKER_CPU_INTENSIVE    = 1 << 6,       /* cpu intensive */
64         WORKER_UNBOUND          = 1 << 7,       /* worker is unbound */
65
66         WORKER_NOT_RUNNING      = WORKER_PREP | WORKER_ROGUE | WORKER_REBIND |
67                                   WORKER_CPU_INTENSIVE | WORKER_UNBOUND,
68
69         /* gcwq->trustee_state */
70         TRUSTEE_START           = 0,            /* start */
71         TRUSTEE_IN_CHARGE       = 1,            /* trustee in charge of gcwq */
72         TRUSTEE_BUTCHER         = 2,            /* butcher workers */
73         TRUSTEE_RELEASE         = 3,            /* release workers */
74         TRUSTEE_DONE            = 4,            /* trustee is done */
75
76         NR_WORKER_POOLS         = 2,            /* # worker pools per gcwq */
77
78         BUSY_WORKER_HASH_ORDER  = 6,            /* 64 pointers */
79         BUSY_WORKER_HASH_SIZE   = 1 << BUSY_WORKER_HASH_ORDER,
80         BUSY_WORKER_HASH_MASK   = BUSY_WORKER_HASH_SIZE - 1,
81
82         MAX_IDLE_WORKERS_RATIO  = 4,            /* 1/4 of busy can be idle */
83         IDLE_WORKER_TIMEOUT     = 300 * HZ,     /* keep idle ones for 5 mins */
84
85         MAYDAY_INITIAL_TIMEOUT  = HZ / 100 >= 2 ? HZ / 100 : 2,
86                                                 /* call for help after 10ms
87                                                    (min two ticks) */
88         MAYDAY_INTERVAL         = HZ / 10,      /* and then every 100ms */
89         CREATE_COOLDOWN         = HZ,           /* time to breath after fail */
90         TRUSTEE_COOLDOWN        = HZ / 10,      /* for trustee draining */
91
92         /*
93          * Rescue workers are used only on emergencies and shared by
94          * all cpus.  Give -20.
95          */
96         RESCUER_NICE_LEVEL      = -20,
97         HIGHPRI_NICE_LEVEL      = -20,
98 };
99
100 /*
101  * Structure fields follow one of the following exclusion rules.
102  *
103  * I: Modifiable by initialization/destruction paths and read-only for
104  *    everyone else.
105  *
106  * P: Preemption protected.  Disabling preemption is enough and should
107  *    only be modified and accessed from the local cpu.
108  *
109  * L: gcwq->lock protected.  Access with gcwq->lock held.
110  *
111  * X: During normal operation, modification requires gcwq->lock and
112  *    should be done only from local cpu.  Either disabling preemption
113  *    on local cpu or grabbing gcwq->lock is enough for read access.
114  *    If GCWQ_DISASSOCIATED is set, it's identical to L.
115  *
116  * F: wq->flush_mutex protected.
117  *
118  * W: workqueue_lock protected.
119  */
120
121 struct global_cwq;
122 struct worker_pool;
123
124 /*
125  * The poor guys doing the actual heavy lifting.  All on-duty workers
126  * are either serving the manager role, on idle list or on busy hash.
127  */
128 struct worker {
129         /* on idle list while idle, on busy hash table while busy */
130         union {
131                 struct list_head        entry;  /* L: while idle */
132                 struct hlist_node       hentry; /* L: while busy */
133         };
134
135         struct work_struct      *current_work;  /* L: work being processed */
136         struct cpu_workqueue_struct *current_cwq; /* L: current_work's cwq */
137         struct list_head        scheduled;      /* L: scheduled works */
138         struct task_struct      *task;          /* I: worker task */
139         struct worker_pool      *pool;          /* I: the associated pool */
140         /* 64 bytes boundary on 64bit, 32 on 32bit */
141         unsigned long           last_active;    /* L: last active timestamp */
142         unsigned int            flags;          /* X: flags */
143         int                     id;             /* I: worker id */
144         struct work_struct      rebind_work;    /* L: rebind worker to cpu */
145 };
146
147 struct worker_pool {
148         struct global_cwq       *gcwq;          /* I: the owning gcwq */
149         unsigned int            flags;          /* X: flags */
150
151         struct list_head        worklist;       /* L: list of pending works */
152         int                     nr_workers;     /* L: total number of workers */
153         int                     nr_idle;        /* L: currently idle ones */
154
155         struct list_head        idle_list;      /* X: list of idle workers */
156         struct timer_list       idle_timer;     /* L: worker idle timeout */
157         struct timer_list       mayday_timer;   /* L: SOS timer for workers */
158
159         struct ida              worker_ida;     /* L: for worker IDs */
160         struct worker           *first_idle;    /* L: first idle worker */
161 };
162
163 /*
164  * Global per-cpu workqueue.  There's one and only one for each cpu
165  * and all works are queued and processed here regardless of their
166  * target workqueues.
167  */
168 struct global_cwq {
169         spinlock_t              lock;           /* the gcwq lock */
170         unsigned int            cpu;            /* I: the associated cpu */
171         unsigned int            flags;          /* L: GCWQ_* flags */
172
173         /* workers are chained either in busy_hash or pool idle_list */
174         struct hlist_head       busy_hash[BUSY_WORKER_HASH_SIZE];
175                                                 /* L: hash of busy workers */
176
177         struct worker_pool      pools[2];       /* normal and highpri pools */
178
179         struct task_struct      *trustee;       /* L: for gcwq shutdown */
180         unsigned int            trustee_state;  /* L: trustee state */
181         wait_queue_head_t       trustee_wait;   /* trustee wait */
182 } ____cacheline_aligned_in_smp;
183
184 /*
185  * The per-CPU workqueue.  The lower WORK_STRUCT_FLAG_BITS of
186  * work_struct->data are used for flags and thus cwqs need to be
187  * aligned at two's power of the number of flag bits.
188  */
189 struct cpu_workqueue_struct {
190         struct worker_pool      *pool;          /* I: the associated pool */
191         struct workqueue_struct *wq;            /* I: the owning workqueue */
192         int                     work_color;     /* L: current color */
193         int                     flush_color;    /* L: flushing color */
194         int                     nr_in_flight[WORK_NR_COLORS];
195                                                 /* L: nr of in_flight works */
196         int                     nr_active;      /* L: nr of active works */
197         int                     max_active;     /* L: max active works */
198         struct list_head        delayed_works;  /* L: delayed works */
199 };
200
201 /*
202  * Structure used to wait for workqueue flush.
203  */
204 struct wq_flusher {
205         struct list_head        list;           /* F: list of flushers */
206         int                     flush_color;    /* F: flush color waiting for */
207         struct completion       done;           /* flush completion */
208 };
209
210 /*
211  * All cpumasks are assumed to be always set on UP and thus can't be
212  * used to determine whether there's something to be done.
213  */
214 #ifdef CONFIG_SMP
215 typedef cpumask_var_t mayday_mask_t;
216 #define mayday_test_and_set_cpu(cpu, mask)      \
217         cpumask_test_and_set_cpu((cpu), (mask))
218 #define mayday_clear_cpu(cpu, mask)             cpumask_clear_cpu((cpu), (mask))
219 #define for_each_mayday_cpu(cpu, mask)          for_each_cpu((cpu), (mask))
220 #define alloc_mayday_mask(maskp, gfp)           zalloc_cpumask_var((maskp), (gfp))
221 #define free_mayday_mask(mask)                  free_cpumask_var((mask))
222 #else
223 typedef unsigned long mayday_mask_t;
224 #define mayday_test_and_set_cpu(cpu, mask)      test_and_set_bit(0, &(mask))
225 #define mayday_clear_cpu(cpu, mask)             clear_bit(0, &(mask))
226 #define for_each_mayday_cpu(cpu, mask)          if ((cpu) = 0, (mask))
227 #define alloc_mayday_mask(maskp, gfp)           true
228 #define free_mayday_mask(mask)                  do { } while (0)
229 #endif
230
231 /*
232  * The externally visible workqueue abstraction is an array of
233  * per-CPU workqueues:
234  */
235 struct workqueue_struct {
236         unsigned int            flags;          /* W: WQ_* flags */
237         union {
238                 struct cpu_workqueue_struct __percpu    *pcpu;
239                 struct cpu_workqueue_struct             *single;
240                 unsigned long                           v;
241         } cpu_wq;                               /* I: cwq's */
242         struct list_head        list;           /* W: list of all workqueues */
243
244         struct mutex            flush_mutex;    /* protects wq flushing */
245         int                     work_color;     /* F: current work color */
246         int                     flush_color;    /* F: current flush color */
247         atomic_t                nr_cwqs_to_flush; /* flush in progress */
248         struct wq_flusher       *first_flusher; /* F: first flusher */
249         struct list_head        flusher_queue;  /* F: flush waiters */
250         struct list_head        flusher_overflow; /* F: flush overflow list */
251
252         mayday_mask_t           mayday_mask;    /* cpus requesting rescue */
253         struct worker           *rescuer;       /* I: rescue worker */
254
255         int                     nr_drainers;    /* W: drain in progress */
256         int                     saved_max_active; /* W: saved cwq max_active */
257 #ifdef CONFIG_LOCKDEP
258         struct lockdep_map      lockdep_map;
259 #endif
260         char                    name[];         /* I: workqueue name */
261 };
262
263 struct workqueue_struct *system_wq __read_mostly;
264 struct workqueue_struct *system_long_wq __read_mostly;
265 struct workqueue_struct *system_nrt_wq __read_mostly;
266 struct workqueue_struct *system_unbound_wq __read_mostly;
267 struct workqueue_struct *system_freezable_wq __read_mostly;
268 struct workqueue_struct *system_nrt_freezable_wq __read_mostly;
269 EXPORT_SYMBOL_GPL(system_wq);
270 EXPORT_SYMBOL_GPL(system_long_wq);
271 EXPORT_SYMBOL_GPL(system_nrt_wq);
272 EXPORT_SYMBOL_GPL(system_unbound_wq);
273 EXPORT_SYMBOL_GPL(system_freezable_wq);
274 EXPORT_SYMBOL_GPL(system_nrt_freezable_wq);
275
276 #define CREATE_TRACE_POINTS
277 #include <trace/events/workqueue.h>
278
279 #define for_each_worker_pool(pool, gcwq)                                \
280         for ((pool) = &(gcwq)->pools[0];                                \
281              (pool) < &(gcwq)->pools[NR_WORKER_POOLS]; (pool)++)
282
283 #define for_each_busy_worker(worker, i, pos, gcwq)                      \
284         for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)                     \
285                 hlist_for_each_entry(worker, pos, &gcwq->busy_hash[i], hentry)
286
287 static inline int __next_gcwq_cpu(int cpu, const struct cpumask *mask,
288                                   unsigned int sw)
289 {
290         if (cpu < nr_cpu_ids) {
291                 if (sw & 1) {
292                         cpu = cpumask_next(cpu, mask);
293                         if (cpu < nr_cpu_ids)
294                                 return cpu;
295                 }
296                 if (sw & 2)
297                         return WORK_CPU_UNBOUND;
298         }
299         return WORK_CPU_NONE;
300 }
301
302 static inline int __next_wq_cpu(int cpu, const struct cpumask *mask,
303                                 struct workqueue_struct *wq)
304 {
305         return __next_gcwq_cpu(cpu, mask, !(wq->flags & WQ_UNBOUND) ? 1 : 2);
306 }
307
308 /*
309  * CPU iterators
310  *
311  * An extra gcwq is defined for an invalid cpu number
312  * (WORK_CPU_UNBOUND) to host workqueues which are not bound to any
313  * specific CPU.  The following iterators are similar to
314  * for_each_*_cpu() iterators but also considers the unbound gcwq.
315  *
316  * for_each_gcwq_cpu()          : possible CPUs + WORK_CPU_UNBOUND
317  * for_each_online_gcwq_cpu()   : online CPUs + WORK_CPU_UNBOUND
318  * for_each_cwq_cpu()           : possible CPUs for bound workqueues,
319  *                                WORK_CPU_UNBOUND for unbound workqueues
320  */
321 #define for_each_gcwq_cpu(cpu)                                          \
322         for ((cpu) = __next_gcwq_cpu(-1, cpu_possible_mask, 3);         \
323              (cpu) < WORK_CPU_NONE;                                     \
324              (cpu) = __next_gcwq_cpu((cpu), cpu_possible_mask, 3))
325
326 #define for_each_online_gcwq_cpu(cpu)                                   \
327         for ((cpu) = __next_gcwq_cpu(-1, cpu_online_mask, 3);           \
328              (cpu) < WORK_CPU_NONE;                                     \
329              (cpu) = __next_gcwq_cpu((cpu), cpu_online_mask, 3))
330
331 #define for_each_cwq_cpu(cpu, wq)                                       \
332         for ((cpu) = __next_wq_cpu(-1, cpu_possible_mask, (wq));        \
333              (cpu) < WORK_CPU_NONE;                                     \
334              (cpu) = __next_wq_cpu((cpu), cpu_possible_mask, (wq)))
335
336 #ifdef CONFIG_DEBUG_OBJECTS_WORK
337
338 static struct debug_obj_descr work_debug_descr;
339
340 static void *work_debug_hint(void *addr)
341 {
342         return ((struct work_struct *) addr)->func;
343 }
344
345 /*
346  * fixup_init is called when:
347  * - an active object is initialized
348  */
349 static int work_fixup_init(void *addr, enum debug_obj_state state)
350 {
351         struct work_struct *work = addr;
352
353         switch (state) {
354         case ODEBUG_STATE_ACTIVE:
355                 cancel_work_sync(work);
356                 debug_object_init(work, &work_debug_descr);
357                 return 1;
358         default:
359                 return 0;
360         }
361 }
362
363 /*
364  * fixup_activate is called when:
365  * - an active object is activated
366  * - an unknown object is activated (might be a statically initialized object)
367  */
368 static int work_fixup_activate(void *addr, enum debug_obj_state state)
369 {
370         struct work_struct *work = addr;
371
372         switch (state) {
373
374         case ODEBUG_STATE_NOTAVAILABLE:
375                 /*
376                  * This is not really a fixup. The work struct was
377                  * statically initialized. We just make sure that it
378                  * is tracked in the object tracker.
379                  */
380                 if (test_bit(WORK_STRUCT_STATIC_BIT, work_data_bits(work))) {
381                         debug_object_init(work, &work_debug_descr);
382                         debug_object_activate(work, &work_debug_descr);
383                         return 0;
384                 }
385                 WARN_ON_ONCE(1);
386                 return 0;
387
388         case ODEBUG_STATE_ACTIVE:
389                 WARN_ON(1);
390
391         default:
392                 return 0;
393         }
394 }
395
396 /*
397  * fixup_free is called when:
398  * - an active object is freed
399  */
400 static int work_fixup_free(void *addr, enum debug_obj_state state)
401 {
402         struct work_struct *work = addr;
403
404         switch (state) {
405         case ODEBUG_STATE_ACTIVE:
406                 cancel_work_sync(work);
407                 debug_object_free(work, &work_debug_descr);
408                 return 1;
409         default:
410                 return 0;
411         }
412 }
413
414 static struct debug_obj_descr work_debug_descr = {
415         .name           = "work_struct",
416         .debug_hint     = work_debug_hint,
417         .fixup_init     = work_fixup_init,
418         .fixup_activate = work_fixup_activate,
419         .fixup_free     = work_fixup_free,
420 };
421
422 static inline void debug_work_activate(struct work_struct *work)
423 {
424         debug_object_activate(work, &work_debug_descr);
425 }
426
427 static inline void debug_work_deactivate(struct work_struct *work)
428 {
429         debug_object_deactivate(work, &work_debug_descr);
430 }
431
432 void __init_work(struct work_struct *work, int onstack)
433 {
434         if (onstack)
435                 debug_object_init_on_stack(work, &work_debug_descr);
436         else
437                 debug_object_init(work, &work_debug_descr);
438 }
439 EXPORT_SYMBOL_GPL(__init_work);
440
441 void destroy_work_on_stack(struct work_struct *work)
442 {
443         debug_object_free(work, &work_debug_descr);
444 }
445 EXPORT_SYMBOL_GPL(destroy_work_on_stack);
446
447 #else
448 static inline void debug_work_activate(struct work_struct *work) { }
449 static inline void debug_work_deactivate(struct work_struct *work) { }
450 #endif
451
452 /* Serializes the accesses to the list of workqueues. */
453 static DEFINE_SPINLOCK(workqueue_lock);
454 static LIST_HEAD(workqueues);
455 static bool workqueue_freezing;         /* W: have wqs started freezing? */
456
457 /*
458  * The almighty global cpu workqueues.  nr_running is the only field
459  * which is expected to be used frequently by other cpus via
460  * try_to_wake_up().  Put it in a separate cacheline.
461  */
462 static DEFINE_PER_CPU(struct global_cwq, global_cwq);
463 static DEFINE_PER_CPU_SHARED_ALIGNED(atomic_t, pool_nr_running[NR_WORKER_POOLS]);
464
465 /*
466  * Global cpu workqueue and nr_running counter for unbound gcwq.  The
467  * gcwq is always online, has GCWQ_DISASSOCIATED set, and all its
468  * workers have WORKER_UNBOUND set.
469  */
470 static struct global_cwq unbound_global_cwq;
471 static atomic_t unbound_pool_nr_running[NR_WORKER_POOLS] = {
472         [0 ... NR_WORKER_POOLS - 1]     = ATOMIC_INIT(0),       /* always 0 */
473 };
474
475 static int worker_thread(void *__worker);
476
477 static int worker_pool_pri(struct worker_pool *pool)
478 {
479         return pool - pool->gcwq->pools;
480 }
481
482 static struct global_cwq *get_gcwq(unsigned int cpu)
483 {
484         if (cpu != WORK_CPU_UNBOUND)
485                 return &per_cpu(global_cwq, cpu);
486         else
487                 return &unbound_global_cwq;
488 }
489
490 static atomic_t *get_pool_nr_running(struct worker_pool *pool)
491 {
492         int cpu = pool->gcwq->cpu;
493         int idx = worker_pool_pri(pool);
494
495         if (cpu != WORK_CPU_UNBOUND)
496                 return &per_cpu(pool_nr_running, cpu)[idx];
497         else
498                 return &unbound_pool_nr_running[idx];
499 }
500
501 static struct cpu_workqueue_struct *get_cwq(unsigned int cpu,
502                                             struct workqueue_struct *wq)
503 {
504         if (!(wq->flags & WQ_UNBOUND)) {
505                 if (likely(cpu < nr_cpu_ids))
506                         return per_cpu_ptr(wq->cpu_wq.pcpu, cpu);
507         } else if (likely(cpu == WORK_CPU_UNBOUND))
508                 return wq->cpu_wq.single;
509         return NULL;
510 }
511
512 static unsigned int work_color_to_flags(int color)
513 {
514         return color << WORK_STRUCT_COLOR_SHIFT;
515 }
516
517 static int get_work_color(struct work_struct *work)
518 {
519         return (*work_data_bits(work) >> WORK_STRUCT_COLOR_SHIFT) &
520                 ((1 << WORK_STRUCT_COLOR_BITS) - 1);
521 }
522
523 static int work_next_color(int color)
524 {
525         return (color + 1) % WORK_NR_COLORS;
526 }
527
528 /*
529  * A work's data points to the cwq with WORK_STRUCT_CWQ set while the
530  * work is on queue.  Once execution starts, WORK_STRUCT_CWQ is
531  * cleared and the work data contains the cpu number it was last on.
532  *
533  * set_work_{cwq|cpu}() and clear_work_data() can be used to set the
534  * cwq, cpu or clear work->data.  These functions should only be
535  * called while the work is owned - ie. while the PENDING bit is set.
536  *
537  * get_work_[g]cwq() can be used to obtain the gcwq or cwq
538  * corresponding to a work.  gcwq is available once the work has been
539  * queued anywhere after initialization.  cwq is available only from
540  * queueing until execution starts.
541  */
542 static inline void set_work_data(struct work_struct *work, unsigned long data,
543                                  unsigned long flags)
544 {
545         BUG_ON(!work_pending(work));
546         atomic_long_set(&work->data, data | flags | work_static(work));
547 }
548
549 static void set_work_cwq(struct work_struct *work,
550                          struct cpu_workqueue_struct *cwq,
551                          unsigned long extra_flags)
552 {
553         set_work_data(work, (unsigned long)cwq,
554                       WORK_STRUCT_PENDING | WORK_STRUCT_CWQ | extra_flags);
555 }
556
557 static void set_work_cpu(struct work_struct *work, unsigned int cpu)
558 {
559         set_work_data(work, cpu << WORK_STRUCT_FLAG_BITS, WORK_STRUCT_PENDING);
560 }
561
562 static void clear_work_data(struct work_struct *work)
563 {
564         set_work_data(work, WORK_STRUCT_NO_CPU, 0);
565 }
566
567 static struct cpu_workqueue_struct *get_work_cwq(struct work_struct *work)
568 {
569         unsigned long data = atomic_long_read(&work->data);
570
571         if (data & WORK_STRUCT_CWQ)
572                 return (void *)(data & WORK_STRUCT_WQ_DATA_MASK);
573         else
574                 return NULL;
575 }
576
577 static struct global_cwq *get_work_gcwq(struct work_struct *work)
578 {
579         unsigned long data = atomic_long_read(&work->data);
580         unsigned int cpu;
581
582         if (data & WORK_STRUCT_CWQ)
583                 return ((struct cpu_workqueue_struct *)
584                         (data & WORK_STRUCT_WQ_DATA_MASK))->pool->gcwq;
585
586         cpu = data >> WORK_STRUCT_FLAG_BITS;
587         if (cpu == WORK_CPU_NONE)
588                 return NULL;
589
590         BUG_ON(cpu >= nr_cpu_ids && cpu != WORK_CPU_UNBOUND);
591         return get_gcwq(cpu);
592 }
593
594 /*
595  * Policy functions.  These define the policies on how the global worker
596  * pools are managed.  Unless noted otherwise, these functions assume that
597  * they're being called with gcwq->lock held.
598  */
599
600 static bool __need_more_worker(struct worker_pool *pool)
601 {
602         return !atomic_read(get_pool_nr_running(pool));
603 }
604
605 /*
606  * Need to wake up a worker?  Called from anything but currently
607  * running workers.
608  *
609  * Note that, because unbound workers never contribute to nr_running, this
610  * function will always return %true for unbound gcwq as long as the
611  * worklist isn't empty.
612  */
613 static bool need_more_worker(struct worker_pool *pool)
614 {
615         return !list_empty(&pool->worklist) && __need_more_worker(pool);
616 }
617
618 /* Can I start working?  Called from busy but !running workers. */
619 static bool may_start_working(struct worker_pool *pool)
620 {
621         return pool->nr_idle;
622 }
623
624 /* Do I need to keep working?  Called from currently running workers. */
625 static bool keep_working(struct worker_pool *pool)
626 {
627         atomic_t *nr_running = get_pool_nr_running(pool);
628
629         return !list_empty(&pool->worklist) && atomic_read(nr_running) <= 1;
630 }
631
632 /* Do we need a new worker?  Called from manager. */
633 static bool need_to_create_worker(struct worker_pool *pool)
634 {
635         return need_more_worker(pool) && !may_start_working(pool);
636 }
637
638 /* Do I need to be the manager? */
639 static bool need_to_manage_workers(struct worker_pool *pool)
640 {
641         return need_to_create_worker(pool) ||
642                 (pool->flags & POOL_MANAGE_WORKERS);
643 }
644
645 /* Do we have too many workers and should some go away? */
646 static bool too_many_workers(struct worker_pool *pool)
647 {
648         bool managing = pool->flags & POOL_MANAGING_WORKERS;
649         int nr_idle = pool->nr_idle + managing; /* manager is considered idle */
650         int nr_busy = pool->nr_workers - nr_idle;
651
652         return nr_idle > 2 && (nr_idle - 2) * MAX_IDLE_WORKERS_RATIO >= nr_busy;
653 }
654
655 /*
656  * Wake up functions.
657  */
658
659 /* Return the first worker.  Safe with preemption disabled */
660 static struct worker *first_worker(struct worker_pool *pool)
661 {
662         if (unlikely(list_empty(&pool->idle_list)))
663                 return NULL;
664
665         return list_first_entry(&pool->idle_list, struct worker, entry);
666 }
667
668 /**
669  * wake_up_worker - wake up an idle worker
670  * @pool: worker pool to wake worker from
671  *
672  * Wake up the first idle worker of @pool.
673  *
674  * CONTEXT:
675  * spin_lock_irq(gcwq->lock).
676  */
677 static void wake_up_worker(struct worker_pool *pool)
678 {
679         struct worker *worker = first_worker(pool);
680
681         if (likely(worker))
682                 wake_up_process(worker->task);
683 }
684
685 /**
686  * wq_worker_waking_up - a worker is waking up
687  * @task: task waking up
688  * @cpu: CPU @task is waking up to
689  *
690  * This function is called during try_to_wake_up() when a worker is
691  * being awoken.
692  *
693  * CONTEXT:
694  * spin_lock_irq(rq->lock)
695  */
696 void wq_worker_waking_up(struct task_struct *task, unsigned int cpu)
697 {
698         struct worker *worker = kthread_data(task);
699
700         if (!(worker->flags & WORKER_NOT_RUNNING))
701                 atomic_inc(get_pool_nr_running(worker->pool));
702 }
703
704 /**
705  * wq_worker_sleeping - a worker is going to sleep
706  * @task: task going to sleep
707  * @cpu: CPU in question, must be the current CPU number
708  *
709  * This function is called during schedule() when a busy worker is
710  * going to sleep.  Worker on the same cpu can be woken up by
711  * returning pointer to its task.
712  *
713  * CONTEXT:
714  * spin_lock_irq(rq->lock)
715  *
716  * RETURNS:
717  * Worker task on @cpu to wake up, %NULL if none.
718  */
719 struct task_struct *wq_worker_sleeping(struct task_struct *task,
720                                        unsigned int cpu)
721 {
722         struct worker *worker = kthread_data(task), *to_wakeup = NULL;
723         struct worker_pool *pool = worker->pool;
724         atomic_t *nr_running = get_pool_nr_running(pool);
725
726         if (worker->flags & WORKER_NOT_RUNNING)
727                 return NULL;
728
729         /* this can only happen on the local cpu */
730         BUG_ON(cpu != raw_smp_processor_id());
731
732         /*
733          * The counterpart of the following dec_and_test, implied mb,
734          * worklist not empty test sequence is in insert_work().
735          * Please read comment there.
736          *
737          * NOT_RUNNING is clear.  This means that trustee is not in
738          * charge and we're running on the local cpu w/ rq lock held
739          * and preemption disabled, which in turn means that none else
740          * could be manipulating idle_list, so dereferencing idle_list
741          * without gcwq lock is safe.
742          */
743         if (atomic_dec_and_test(nr_running) && !list_empty(&pool->worklist))
744                 to_wakeup = first_worker(pool);
745         return to_wakeup ? to_wakeup->task : NULL;
746 }
747
748 /**
749  * worker_set_flags - set worker flags and adjust nr_running accordingly
750  * @worker: self
751  * @flags: flags to set
752  * @wakeup: wakeup an idle worker if necessary
753  *
754  * Set @flags in @worker->flags and adjust nr_running accordingly.  If
755  * nr_running becomes zero and @wakeup is %true, an idle worker is
756  * woken up.
757  *
758  * CONTEXT:
759  * spin_lock_irq(gcwq->lock)
760  */
761 static inline void worker_set_flags(struct worker *worker, unsigned int flags,
762                                     bool wakeup)
763 {
764         struct worker_pool *pool = worker->pool;
765
766         WARN_ON_ONCE(worker->task != current);
767
768         /*
769          * If transitioning into NOT_RUNNING, adjust nr_running and
770          * wake up an idle worker as necessary if requested by
771          * @wakeup.
772          */
773         if ((flags & WORKER_NOT_RUNNING) &&
774             !(worker->flags & WORKER_NOT_RUNNING)) {
775                 atomic_t *nr_running = get_pool_nr_running(pool);
776
777                 if (wakeup) {
778                         if (atomic_dec_and_test(nr_running) &&
779                             !list_empty(&pool->worklist))
780                                 wake_up_worker(pool);
781                 } else
782                         atomic_dec(nr_running);
783         }
784
785         worker->flags |= flags;
786 }
787
788 /**
789  * worker_clr_flags - clear worker flags and adjust nr_running accordingly
790  * @worker: self
791  * @flags: flags to clear
792  *
793  * Clear @flags in @worker->flags and adjust nr_running accordingly.
794  *
795  * CONTEXT:
796  * spin_lock_irq(gcwq->lock)
797  */
798 static inline void worker_clr_flags(struct worker *worker, unsigned int flags)
799 {
800         struct worker_pool *pool = worker->pool;
801         unsigned int oflags = worker->flags;
802
803         WARN_ON_ONCE(worker->task != current);
804
805         worker->flags &= ~flags;
806
807         /*
808          * If transitioning out of NOT_RUNNING, increment nr_running.  Note
809          * that the nested NOT_RUNNING is not a noop.  NOT_RUNNING is mask
810          * of multiple flags, not a single flag.
811          */
812         if ((flags & WORKER_NOT_RUNNING) && (oflags & WORKER_NOT_RUNNING))
813                 if (!(worker->flags & WORKER_NOT_RUNNING))
814                         atomic_inc(get_pool_nr_running(pool));
815 }
816
817 /**
818  * busy_worker_head - return the busy hash head for a work
819  * @gcwq: gcwq of interest
820  * @work: work to be hashed
821  *
822  * Return hash head of @gcwq for @work.
823  *
824  * CONTEXT:
825  * spin_lock_irq(gcwq->lock).
826  *
827  * RETURNS:
828  * Pointer to the hash head.
829  */
830 static struct hlist_head *busy_worker_head(struct global_cwq *gcwq,
831                                            struct work_struct *work)
832 {
833         const int base_shift = ilog2(sizeof(struct work_struct));
834         unsigned long v = (unsigned long)work;
835
836         /* simple shift and fold hash, do we need something better? */
837         v >>= base_shift;
838         v += v >> BUSY_WORKER_HASH_ORDER;
839         v &= BUSY_WORKER_HASH_MASK;
840
841         return &gcwq->busy_hash[v];
842 }
843
844 /**
845  * __find_worker_executing_work - find worker which is executing a work
846  * @gcwq: gcwq of interest
847  * @bwh: hash head as returned by busy_worker_head()
848  * @work: work to find worker for
849  *
850  * Find a worker which is executing @work on @gcwq.  @bwh should be
851  * the hash head obtained by calling busy_worker_head() with the same
852  * work.
853  *
854  * CONTEXT:
855  * spin_lock_irq(gcwq->lock).
856  *
857  * RETURNS:
858  * Pointer to worker which is executing @work if found, NULL
859  * otherwise.
860  */
861 static struct worker *__find_worker_executing_work(struct global_cwq *gcwq,
862                                                    struct hlist_head *bwh,
863                                                    struct work_struct *work)
864 {
865         struct worker *worker;
866         struct hlist_node *tmp;
867
868         hlist_for_each_entry(worker, tmp, bwh, hentry)
869                 if (worker->current_work == work)
870                         return worker;
871         return NULL;
872 }
873
874 /**
875  * find_worker_executing_work - find worker which is executing a work
876  * @gcwq: gcwq of interest
877  * @work: work to find worker for
878  *
879  * Find a worker which is executing @work on @gcwq.  This function is
880  * identical to __find_worker_executing_work() except that this
881  * function calculates @bwh itself.
882  *
883  * CONTEXT:
884  * spin_lock_irq(gcwq->lock).
885  *
886  * RETURNS:
887  * Pointer to worker which is executing @work if found, NULL
888  * otherwise.
889  */
890 static struct worker *find_worker_executing_work(struct global_cwq *gcwq,
891                                                  struct work_struct *work)
892 {
893         return __find_worker_executing_work(gcwq, busy_worker_head(gcwq, work),
894                                             work);
895 }
896
897 /**
898  * insert_work - insert a work into gcwq
899  * @cwq: cwq @work belongs to
900  * @work: work to insert
901  * @head: insertion point
902  * @extra_flags: extra WORK_STRUCT_* flags to set
903  *
904  * Insert @work which belongs to @cwq into @gcwq after @head.
905  * @extra_flags is or'd to work_struct flags.
906  *
907  * CONTEXT:
908  * spin_lock_irq(gcwq->lock).
909  */
910 static void insert_work(struct cpu_workqueue_struct *cwq,
911                         struct work_struct *work, struct list_head *head,
912                         unsigned int extra_flags)
913 {
914         struct worker_pool *pool = cwq->pool;
915
916         /* we own @work, set data and link */
917         set_work_cwq(work, cwq, extra_flags);
918
919         /*
920          * Ensure that we get the right work->data if we see the
921          * result of list_add() below, see try_to_grab_pending().
922          */
923         smp_wmb();
924
925         list_add_tail(&work->entry, head);
926
927         /*
928          * Ensure either worker_sched_deactivated() sees the above
929          * list_add_tail() or we see zero nr_running to avoid workers
930          * lying around lazily while there are works to be processed.
931          */
932         smp_mb();
933
934         if (__need_more_worker(pool))
935                 wake_up_worker(pool);
936 }
937
938 /*
939  * Test whether @work is being queued from another work executing on the
940  * same workqueue.  This is rather expensive and should only be used from
941  * cold paths.
942  */
943 static bool is_chained_work(struct workqueue_struct *wq)
944 {
945         unsigned long flags;
946         unsigned int cpu;
947
948         for_each_gcwq_cpu(cpu) {
949                 struct global_cwq *gcwq = get_gcwq(cpu);
950                 struct worker *worker;
951                 struct hlist_node *pos;
952                 int i;
953
954                 spin_lock_irqsave(&gcwq->lock, flags);
955                 for_each_busy_worker(worker, i, pos, gcwq) {
956                         if (worker->task != current)
957                                 continue;
958                         spin_unlock_irqrestore(&gcwq->lock, flags);
959                         /*
960                          * I'm @worker, no locking necessary.  See if @work
961                          * is headed to the same workqueue.
962                          */
963                         return worker->current_cwq->wq == wq;
964                 }
965                 spin_unlock_irqrestore(&gcwq->lock, flags);
966         }
967         return false;
968 }
969
970 static void __queue_work(unsigned int cpu, struct workqueue_struct *wq,
971                          struct work_struct *work)
972 {
973         struct global_cwq *gcwq;
974         struct cpu_workqueue_struct *cwq;
975         struct list_head *worklist;
976         unsigned int work_flags;
977         unsigned long flags;
978
979         debug_work_activate(work);
980
981         /* if dying, only works from the same workqueue are allowed */
982         if (unlikely(wq->flags & WQ_DRAINING) &&
983             WARN_ON_ONCE(!is_chained_work(wq)))
984                 return;
985
986         /* determine gcwq to use */
987         if (!(wq->flags & WQ_UNBOUND)) {
988                 struct global_cwq *last_gcwq;
989
990                 if (unlikely(cpu == WORK_CPU_UNBOUND))
991                         cpu = raw_smp_processor_id();
992
993                 /*
994                  * It's multi cpu.  If @wq is non-reentrant and @work
995                  * was previously on a different cpu, it might still
996                  * be running there, in which case the work needs to
997                  * be queued on that cpu to guarantee non-reentrance.
998                  */
999                 gcwq = get_gcwq(cpu);
1000                 if (wq->flags & WQ_NON_REENTRANT &&
1001                     (last_gcwq = get_work_gcwq(work)) && last_gcwq != gcwq) {
1002                         struct worker *worker;
1003
1004                         spin_lock_irqsave(&last_gcwq->lock, flags);
1005
1006                         worker = find_worker_executing_work(last_gcwq, work);
1007
1008                         if (worker && worker->current_cwq->wq == wq)
1009                                 gcwq = last_gcwq;
1010                         else {
1011                                 /* meh... not running there, queue here */
1012                                 spin_unlock_irqrestore(&last_gcwq->lock, flags);
1013                                 spin_lock_irqsave(&gcwq->lock, flags);
1014                         }
1015                 } else
1016                         spin_lock_irqsave(&gcwq->lock, flags);
1017         } else {
1018                 gcwq = get_gcwq(WORK_CPU_UNBOUND);
1019                 spin_lock_irqsave(&gcwq->lock, flags);
1020         }
1021
1022         /* gcwq determined, get cwq and queue */
1023         cwq = get_cwq(gcwq->cpu, wq);
1024         trace_workqueue_queue_work(cpu, cwq, work);
1025
1026         if (WARN_ON(!list_empty(&work->entry))) {
1027                 spin_unlock_irqrestore(&gcwq->lock, flags);
1028                 return;
1029         }
1030
1031         cwq->nr_in_flight[cwq->work_color]++;
1032         work_flags = work_color_to_flags(cwq->work_color);
1033
1034         if (likely(cwq->nr_active < cwq->max_active)) {
1035                 trace_workqueue_activate_work(work);
1036                 cwq->nr_active++;
1037                 worklist = &cwq->pool->worklist;
1038         } else {
1039                 work_flags |= WORK_STRUCT_DELAYED;
1040                 worklist = &cwq->delayed_works;
1041         }
1042
1043         insert_work(cwq, work, worklist, work_flags);
1044
1045         spin_unlock_irqrestore(&gcwq->lock, flags);
1046 }
1047
1048 /**
1049  * queue_work - queue work on a workqueue
1050  * @wq: workqueue to use
1051  * @work: work to queue
1052  *
1053  * Returns 0 if @work was already on a queue, non-zero otherwise.
1054  *
1055  * We queue the work to the CPU on which it was submitted, but if the CPU dies
1056  * it can be processed by another CPU.
1057  */
1058 int queue_work(struct workqueue_struct *wq, struct work_struct *work)
1059 {
1060         int ret;
1061
1062         ret = queue_work_on(get_cpu(), wq, work);
1063         put_cpu();
1064
1065         return ret;
1066 }
1067 EXPORT_SYMBOL_GPL(queue_work);
1068
1069 /**
1070  * queue_work_on - queue work on specific cpu
1071  * @cpu: CPU number to execute work on
1072  * @wq: workqueue to use
1073  * @work: work to queue
1074  *
1075  * Returns 0 if @work was already on a queue, non-zero otherwise.
1076  *
1077  * We queue the work to a specific CPU, the caller must ensure it
1078  * can't go away.
1079  */
1080 int
1081 queue_work_on(int cpu, struct workqueue_struct *wq, struct work_struct *work)
1082 {
1083         int ret = 0;
1084
1085         if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
1086                 __queue_work(cpu, wq, work);
1087                 ret = 1;
1088         }
1089         return ret;
1090 }
1091 EXPORT_SYMBOL_GPL(queue_work_on);
1092
1093 static void delayed_work_timer_fn(unsigned long __data)
1094 {
1095         struct delayed_work *dwork = (struct delayed_work *)__data;
1096         struct cpu_workqueue_struct *cwq = get_work_cwq(&dwork->work);
1097
1098         __queue_work(smp_processor_id(), cwq->wq, &dwork->work);
1099 }
1100
1101 /**
1102  * queue_delayed_work - queue work on a workqueue after delay
1103  * @wq: workqueue to use
1104  * @dwork: delayable work to queue
1105  * @delay: number of jiffies to wait before queueing
1106  *
1107  * Returns 0 if @work was already on a queue, non-zero otherwise.
1108  */
1109 int queue_delayed_work(struct workqueue_struct *wq,
1110                         struct delayed_work *dwork, unsigned long delay)
1111 {
1112         if (delay == 0)
1113                 return queue_work(wq, &dwork->work);
1114
1115         return queue_delayed_work_on(-1, wq, dwork, delay);
1116 }
1117 EXPORT_SYMBOL_GPL(queue_delayed_work);
1118
1119 /**
1120  * queue_delayed_work_on - queue work on specific CPU after delay
1121  * @cpu: CPU number to execute work on
1122  * @wq: workqueue to use
1123  * @dwork: work to queue
1124  * @delay: number of jiffies to wait before queueing
1125  *
1126  * Returns 0 if @work was already on a queue, non-zero otherwise.
1127  */
1128 int queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
1129                         struct delayed_work *dwork, unsigned long delay)
1130 {
1131         int ret = 0;
1132         struct timer_list *timer = &dwork->timer;
1133         struct work_struct *work = &dwork->work;
1134
1135         if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
1136                 unsigned int lcpu;
1137
1138                 BUG_ON(timer_pending(timer));
1139                 BUG_ON(!list_empty(&work->entry));
1140
1141                 timer_stats_timer_set_start_info(&dwork->timer);
1142
1143                 /*
1144                  * This stores cwq for the moment, for the timer_fn.
1145                  * Note that the work's gcwq is preserved to allow
1146                  * reentrance detection for delayed works.
1147                  */
1148                 if (!(wq->flags & WQ_UNBOUND)) {
1149                         struct global_cwq *gcwq = get_work_gcwq(work);
1150
1151                         if (gcwq && gcwq->cpu != WORK_CPU_UNBOUND)
1152                                 lcpu = gcwq->cpu;
1153                         else
1154                                 lcpu = raw_smp_processor_id();
1155                 } else
1156                         lcpu = WORK_CPU_UNBOUND;
1157
1158                 set_work_cwq(work, get_cwq(lcpu, wq), 0);
1159
1160                 timer->expires = jiffies + delay;
1161                 timer->data = (unsigned long)dwork;
1162                 timer->function = delayed_work_timer_fn;
1163
1164                 if (unlikely(cpu >= 0))
1165                         add_timer_on(timer, cpu);
1166                 else
1167                         add_timer(timer);
1168                 ret = 1;
1169         }
1170         return ret;
1171 }
1172 EXPORT_SYMBOL_GPL(queue_delayed_work_on);
1173
1174 /**
1175  * worker_enter_idle - enter idle state
1176  * @worker: worker which is entering idle state
1177  *
1178  * @worker is entering idle state.  Update stats and idle timer if
1179  * necessary.
1180  *
1181  * LOCKING:
1182  * spin_lock_irq(gcwq->lock).
1183  */
1184 static void worker_enter_idle(struct worker *worker)
1185 {
1186         struct worker_pool *pool = worker->pool;
1187         struct global_cwq *gcwq = pool->gcwq;
1188
1189         BUG_ON(worker->flags & WORKER_IDLE);
1190         BUG_ON(!list_empty(&worker->entry) &&
1191                (worker->hentry.next || worker->hentry.pprev));
1192
1193         /* can't use worker_set_flags(), also called from start_worker() */
1194         worker->flags |= WORKER_IDLE;
1195         pool->nr_idle++;
1196         worker->last_active = jiffies;
1197
1198         /* idle_list is LIFO */
1199         list_add(&worker->entry, &pool->idle_list);
1200
1201         if (likely(!(worker->flags & WORKER_ROGUE))) {
1202                 if (too_many_workers(pool) && !timer_pending(&pool->idle_timer))
1203                         mod_timer(&pool->idle_timer,
1204                                   jiffies + IDLE_WORKER_TIMEOUT);
1205         } else
1206                 wake_up_all(&gcwq->trustee_wait);
1207
1208         /*
1209          * Sanity check nr_running.  Because trustee releases gcwq->lock
1210          * between setting %WORKER_ROGUE and zapping nr_running, the
1211          * warning may trigger spuriously.  Check iff trustee is idle.
1212          */
1213         WARN_ON_ONCE(gcwq->trustee_state == TRUSTEE_DONE &&
1214                      pool->nr_workers == pool->nr_idle &&
1215                      atomic_read(get_pool_nr_running(pool)));
1216 }
1217
1218 /**
1219  * worker_leave_idle - leave idle state
1220  * @worker: worker which is leaving idle state
1221  *
1222  * @worker is leaving idle state.  Update stats.
1223  *
1224  * LOCKING:
1225  * spin_lock_irq(gcwq->lock).
1226  */
1227 static void worker_leave_idle(struct worker *worker)
1228 {
1229         struct worker_pool *pool = worker->pool;
1230
1231         BUG_ON(!(worker->flags & WORKER_IDLE));
1232         worker_clr_flags(worker, WORKER_IDLE);
1233         pool->nr_idle--;
1234         list_del_init(&worker->entry);
1235 }
1236
1237 /**
1238  * worker_maybe_bind_and_lock - bind worker to its cpu if possible and lock gcwq
1239  * @worker: self
1240  *
1241  * Works which are scheduled while the cpu is online must at least be
1242  * scheduled to a worker which is bound to the cpu so that if they are
1243  * flushed from cpu callbacks while cpu is going down, they are
1244  * guaranteed to execute on the cpu.
1245  *
1246  * This function is to be used by rogue workers and rescuers to bind
1247  * themselves to the target cpu and may race with cpu going down or
1248  * coming online.  kthread_bind() can't be used because it may put the
1249  * worker to already dead cpu and set_cpus_allowed_ptr() can't be used
1250  * verbatim as it's best effort and blocking and gcwq may be
1251  * [dis]associated in the meantime.
1252  *
1253  * This function tries set_cpus_allowed() and locks gcwq and verifies
1254  * the binding against GCWQ_DISASSOCIATED which is set during
1255  * CPU_DYING and cleared during CPU_ONLINE, so if the worker enters
1256  * idle state or fetches works without dropping lock, it can guarantee
1257  * the scheduling requirement described in the first paragraph.
1258  *
1259  * CONTEXT:
1260  * Might sleep.  Called without any lock but returns with gcwq->lock
1261  * held.
1262  *
1263  * RETURNS:
1264  * %true if the associated gcwq is online (@worker is successfully
1265  * bound), %false if offline.
1266  */
1267 static bool worker_maybe_bind_and_lock(struct worker *worker)
1268 __acquires(&gcwq->lock)
1269 {
1270         struct global_cwq *gcwq = worker->pool->gcwq;
1271         struct task_struct *task = worker->task;
1272
1273         while (true) {
1274                 /*
1275                  * The following call may fail, succeed or succeed
1276                  * without actually migrating the task to the cpu if
1277                  * it races with cpu hotunplug operation.  Verify
1278                  * against GCWQ_DISASSOCIATED.
1279                  */
1280                 if (!(gcwq->flags & GCWQ_DISASSOCIATED))
1281                         set_cpus_allowed_ptr(task, get_cpu_mask(gcwq->cpu));
1282
1283                 spin_lock_irq(&gcwq->lock);
1284                 if (gcwq->flags & GCWQ_DISASSOCIATED)
1285                         return false;
1286                 if (task_cpu(task) == gcwq->cpu &&
1287                     cpumask_equal(&current->cpus_allowed,
1288                                   get_cpu_mask(gcwq->cpu)))
1289                         return true;
1290                 spin_unlock_irq(&gcwq->lock);
1291
1292                 /*
1293                  * We've raced with CPU hot[un]plug.  Give it a breather
1294                  * and retry migration.  cond_resched() is required here;
1295                  * otherwise, we might deadlock against cpu_stop trying to
1296                  * bring down the CPU on non-preemptive kernel.
1297                  */
1298                 cpu_relax();
1299                 cond_resched();
1300         }
1301 }
1302
1303 /*
1304  * Function for worker->rebind_work used to rebind rogue busy workers
1305  * to the associated cpu which is coming back online.  This is
1306  * scheduled by cpu up but can race with other cpu hotplug operations
1307  * and may be executed twice without intervening cpu down.
1308  */
1309 static void worker_rebind_fn(struct work_struct *work)
1310 {
1311         struct worker *worker = container_of(work, struct worker, rebind_work);
1312         struct global_cwq *gcwq = worker->pool->gcwq;
1313
1314         if (worker_maybe_bind_and_lock(worker))
1315                 worker_clr_flags(worker, WORKER_REBIND);
1316
1317         spin_unlock_irq(&gcwq->lock);
1318 }
1319
1320 static struct worker *alloc_worker(void)
1321 {
1322         struct worker *worker;
1323
1324         worker = kzalloc(sizeof(*worker), GFP_KERNEL);
1325         if (worker) {
1326                 INIT_LIST_HEAD(&worker->entry);
1327                 INIT_LIST_HEAD(&worker->scheduled);
1328                 INIT_WORK(&worker->rebind_work, worker_rebind_fn);
1329                 /* on creation a worker is in !idle && prep state */
1330                 worker->flags = WORKER_PREP;
1331         }
1332         return worker;
1333 }
1334
1335 /**
1336  * create_worker - create a new workqueue worker
1337  * @pool: pool the new worker will belong to
1338  * @bind: whether to set affinity to @cpu or not
1339  *
1340  * Create a new worker which is bound to @pool.  The returned worker
1341  * can be started by calling start_worker() or destroyed using
1342  * destroy_worker().
1343  *
1344  * CONTEXT:
1345  * Might sleep.  Does GFP_KERNEL allocations.
1346  *
1347  * RETURNS:
1348  * Pointer to the newly created worker.
1349  */
1350 static struct worker *create_worker(struct worker_pool *pool, bool bind)
1351 {
1352         struct global_cwq *gcwq = pool->gcwq;
1353         bool on_unbound_cpu = gcwq->cpu == WORK_CPU_UNBOUND;
1354         const char *pri = worker_pool_pri(pool) ? "H" : "";
1355         struct worker *worker = NULL;
1356         int id = -1;
1357
1358         spin_lock_irq(&gcwq->lock);
1359         while (ida_get_new(&pool->worker_ida, &id)) {
1360                 spin_unlock_irq(&gcwq->lock);
1361                 if (!ida_pre_get(&pool->worker_ida, GFP_KERNEL))
1362                         goto fail;
1363                 spin_lock_irq(&gcwq->lock);
1364         }
1365         spin_unlock_irq(&gcwq->lock);
1366
1367         worker = alloc_worker();
1368         if (!worker)
1369                 goto fail;
1370
1371         worker->pool = pool;
1372         worker->id = id;
1373
1374         if (!on_unbound_cpu)
1375                 worker->task = kthread_create_on_node(worker_thread,
1376                                         worker, cpu_to_node(gcwq->cpu),
1377                                         "kworker/%u:%d%s", gcwq->cpu, id, pri);
1378         else
1379                 worker->task = kthread_create(worker_thread, worker,
1380                                               "kworker/u:%d%s", id, pri);
1381         if (IS_ERR(worker->task))
1382                 goto fail;
1383
1384         if (worker_pool_pri(pool))
1385                 set_user_nice(worker->task, HIGHPRI_NICE_LEVEL);
1386
1387         /*
1388          * A rogue worker will become a regular one if CPU comes
1389          * online later on.  Make sure every worker has
1390          * PF_THREAD_BOUND set.
1391          */
1392         if (bind && !on_unbound_cpu)
1393                 kthread_bind(worker->task, gcwq->cpu);
1394         else {
1395                 worker->task->flags |= PF_THREAD_BOUND;
1396                 if (on_unbound_cpu)
1397                         worker->flags |= WORKER_UNBOUND;
1398         }
1399
1400         return worker;
1401 fail:
1402         if (id >= 0) {
1403                 spin_lock_irq(&gcwq->lock);
1404                 ida_remove(&pool->worker_ida, id);
1405                 spin_unlock_irq(&gcwq->lock);
1406         }
1407         kfree(worker);
1408         return NULL;
1409 }
1410
1411 /**
1412  * start_worker - start a newly created worker
1413  * @worker: worker to start
1414  *
1415  * Make the gcwq aware of @worker and start it.
1416  *
1417  * CONTEXT:
1418  * spin_lock_irq(gcwq->lock).
1419  */
1420 static void start_worker(struct worker *worker)
1421 {
1422         worker->flags |= WORKER_STARTED;
1423         worker->pool->nr_workers++;
1424         worker_enter_idle(worker);
1425         wake_up_process(worker->task);
1426 }
1427
1428 /**
1429  * destroy_worker - destroy a workqueue worker
1430  * @worker: worker to be destroyed
1431  *
1432  * Destroy @worker and adjust @gcwq stats accordingly.
1433  *
1434  * CONTEXT:
1435  * spin_lock_irq(gcwq->lock) which is released and regrabbed.
1436  */
1437 static void destroy_worker(struct worker *worker)
1438 {
1439         struct worker_pool *pool = worker->pool;
1440         struct global_cwq *gcwq = pool->gcwq;
1441         int id = worker->id;
1442
1443         /* sanity check frenzy */
1444         BUG_ON(worker->current_work);
1445         BUG_ON(!list_empty(&worker->scheduled));
1446
1447         if (worker->flags & WORKER_STARTED)
1448                 pool->nr_workers--;
1449         if (worker->flags & WORKER_IDLE)
1450                 pool->nr_idle--;
1451
1452         list_del_init(&worker->entry);
1453         worker->flags |= WORKER_DIE;
1454
1455         spin_unlock_irq(&gcwq->lock);
1456
1457         kthread_stop(worker->task);
1458         kfree(worker);
1459
1460         spin_lock_irq(&gcwq->lock);
1461         ida_remove(&pool->worker_ida, id);
1462 }
1463
1464 static void idle_worker_timeout(unsigned long __pool)
1465 {
1466         struct worker_pool *pool = (void *)__pool;
1467         struct global_cwq *gcwq = pool->gcwq;
1468
1469         spin_lock_irq(&gcwq->lock);
1470
1471         if (too_many_workers(pool)) {
1472                 struct worker *worker;
1473                 unsigned long expires;
1474
1475                 /* idle_list is kept in LIFO order, check the last one */
1476                 worker = list_entry(pool->idle_list.prev, struct worker, entry);
1477                 expires = worker->last_active + IDLE_WORKER_TIMEOUT;
1478
1479                 if (time_before(jiffies, expires))
1480                         mod_timer(&pool->idle_timer, expires);
1481                 else {
1482                         /* it's been idle for too long, wake up manager */
1483                         pool->flags |= POOL_MANAGE_WORKERS;
1484                         wake_up_worker(pool);
1485                 }
1486         }
1487
1488         spin_unlock_irq(&gcwq->lock);
1489 }
1490
1491 static bool send_mayday(struct work_struct *work)
1492 {
1493         struct cpu_workqueue_struct *cwq = get_work_cwq(work);
1494         struct workqueue_struct *wq = cwq->wq;
1495         unsigned int cpu;
1496
1497         if (!(wq->flags & WQ_RESCUER))
1498                 return false;
1499
1500         /* mayday mayday mayday */
1501         cpu = cwq->pool->gcwq->cpu;
1502         /* WORK_CPU_UNBOUND can't be set in cpumask, use cpu 0 instead */
1503         if (cpu == WORK_CPU_UNBOUND)
1504                 cpu = 0;
1505         if (!mayday_test_and_set_cpu(cpu, wq->mayday_mask))
1506                 wake_up_process(wq->rescuer->task);
1507         return true;
1508 }
1509
1510 static void gcwq_mayday_timeout(unsigned long __pool)
1511 {
1512         struct worker_pool *pool = (void *)__pool;
1513         struct global_cwq *gcwq = pool->gcwq;
1514         struct work_struct *work;
1515
1516         spin_lock_irq(&gcwq->lock);
1517
1518         if (need_to_create_worker(pool)) {
1519                 /*
1520                  * We've been trying to create a new worker but
1521                  * haven't been successful.  We might be hitting an
1522                  * allocation deadlock.  Send distress signals to
1523                  * rescuers.
1524                  */
1525                 list_for_each_entry(work, &pool->worklist, entry)
1526                         send_mayday(work);
1527         }
1528
1529         spin_unlock_irq(&gcwq->lock);
1530
1531         mod_timer(&pool->mayday_timer, jiffies + MAYDAY_INTERVAL);
1532 }
1533
1534 /**
1535  * maybe_create_worker - create a new worker if necessary
1536  * @pool: pool to create a new worker for
1537  *
1538  * Create a new worker for @pool if necessary.  @pool is guaranteed to
1539  * have at least one idle worker on return from this function.  If
1540  * creating a new worker takes longer than MAYDAY_INTERVAL, mayday is
1541  * sent to all rescuers with works scheduled on @pool to resolve
1542  * possible allocation deadlock.
1543  *
1544  * On return, need_to_create_worker() is guaranteed to be false and
1545  * may_start_working() true.
1546  *
1547  * LOCKING:
1548  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1549  * multiple times.  Does GFP_KERNEL allocations.  Called only from
1550  * manager.
1551  *
1552  * RETURNS:
1553  * false if no action was taken and gcwq->lock stayed locked, true
1554  * otherwise.
1555  */
1556 static bool maybe_create_worker(struct worker_pool *pool)
1557 __releases(&gcwq->lock)
1558 __acquires(&gcwq->lock)
1559 {
1560         struct global_cwq *gcwq = pool->gcwq;
1561
1562         if (!need_to_create_worker(pool))
1563                 return false;
1564 restart:
1565         spin_unlock_irq(&gcwq->lock);
1566
1567         /* if we don't make progress in MAYDAY_INITIAL_TIMEOUT, call for help */
1568         mod_timer(&pool->mayday_timer, jiffies + MAYDAY_INITIAL_TIMEOUT);
1569
1570         while (true) {
1571                 struct worker *worker;
1572
1573                 worker = create_worker(pool, true);
1574                 if (worker) {
1575                         del_timer_sync(&pool->mayday_timer);
1576                         spin_lock_irq(&gcwq->lock);
1577                         start_worker(worker);
1578                         BUG_ON(need_to_create_worker(pool));
1579                         return true;
1580                 }
1581
1582                 if (!need_to_create_worker(pool))
1583                         break;
1584
1585                 __set_current_state(TASK_INTERRUPTIBLE);
1586                 schedule_timeout(CREATE_COOLDOWN);
1587
1588                 if (!need_to_create_worker(pool))
1589                         break;
1590         }
1591
1592         del_timer_sync(&pool->mayday_timer);
1593         spin_lock_irq(&gcwq->lock);
1594         if (need_to_create_worker(pool))
1595                 goto restart;
1596         return true;
1597 }
1598
1599 /**
1600  * maybe_destroy_worker - destroy workers which have been idle for a while
1601  * @pool: pool to destroy workers for
1602  *
1603  * Destroy @pool workers which have been idle for longer than
1604  * IDLE_WORKER_TIMEOUT.
1605  *
1606  * LOCKING:
1607  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1608  * multiple times.  Called only from manager.
1609  *
1610  * RETURNS:
1611  * false if no action was taken and gcwq->lock stayed locked, true
1612  * otherwise.
1613  */
1614 static bool maybe_destroy_workers(struct worker_pool *pool)
1615 {
1616         bool ret = false;
1617
1618         while (too_many_workers(pool)) {
1619                 struct worker *worker;
1620                 unsigned long expires;
1621
1622                 worker = list_entry(pool->idle_list.prev, struct worker, entry);
1623                 expires = worker->last_active + IDLE_WORKER_TIMEOUT;
1624
1625                 if (time_before(jiffies, expires)) {
1626                         mod_timer(&pool->idle_timer, expires);
1627                         break;
1628                 }
1629
1630                 destroy_worker(worker);
1631                 ret = true;
1632         }
1633
1634         return ret;
1635 }
1636
1637 /**
1638  * manage_workers - manage worker pool
1639  * @worker: self
1640  *
1641  * Assume the manager role and manage gcwq worker pool @worker belongs
1642  * to.  At any given time, there can be only zero or one manager per
1643  * gcwq.  The exclusion is handled automatically by this function.
1644  *
1645  * The caller can safely start processing works on false return.  On
1646  * true return, it's guaranteed that need_to_create_worker() is false
1647  * and may_start_working() is true.
1648  *
1649  * CONTEXT:
1650  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1651  * multiple times.  Does GFP_KERNEL allocations.
1652  *
1653  * RETURNS:
1654  * false if no action was taken and gcwq->lock stayed locked, true if
1655  * some action was taken.
1656  */
1657 static bool manage_workers(struct worker *worker)
1658 {
1659         struct worker_pool *pool = worker->pool;
1660         struct global_cwq *gcwq = pool->gcwq;
1661         bool ret = false;
1662
1663         if (pool->flags & POOL_MANAGING_WORKERS)
1664                 return ret;
1665
1666         pool->flags &= ~POOL_MANAGE_WORKERS;
1667         pool->flags |= POOL_MANAGING_WORKERS;
1668
1669         /*
1670          * Destroy and then create so that may_start_working() is true
1671          * on return.
1672          */
1673         ret |= maybe_destroy_workers(pool);
1674         ret |= maybe_create_worker(pool);
1675
1676         pool->flags &= ~POOL_MANAGING_WORKERS;
1677
1678         /*
1679          * The trustee might be waiting to take over the manager
1680          * position, tell it we're done.
1681          */
1682         if (unlikely(gcwq->trustee))
1683                 wake_up_all(&gcwq->trustee_wait);
1684
1685         return ret;
1686 }
1687
1688 /**
1689  * move_linked_works - move linked works to a list
1690  * @work: start of series of works to be scheduled
1691  * @head: target list to append @work to
1692  * @nextp: out paramter for nested worklist walking
1693  *
1694  * Schedule linked works starting from @work to @head.  Work series to
1695  * be scheduled starts at @work and includes any consecutive work with
1696  * WORK_STRUCT_LINKED set in its predecessor.
1697  *
1698  * If @nextp is not NULL, it's updated to point to the next work of
1699  * the last scheduled work.  This allows move_linked_works() to be
1700  * nested inside outer list_for_each_entry_safe().
1701  *
1702  * CONTEXT:
1703  * spin_lock_irq(gcwq->lock).
1704  */
1705 static void move_linked_works(struct work_struct *work, struct list_head *head,
1706                               struct work_struct **nextp)
1707 {
1708         struct work_struct *n;
1709
1710         /*
1711          * Linked worklist will always end before the end of the list,
1712          * use NULL for list head.
1713          */
1714         list_for_each_entry_safe_from(work, n, NULL, entry) {
1715                 list_move_tail(&work->entry, head);
1716                 if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))
1717                         break;
1718         }
1719
1720         /*
1721          * If we're already inside safe list traversal and have moved
1722          * multiple works to the scheduled queue, the next position
1723          * needs to be updated.
1724          */
1725         if (nextp)
1726                 *nextp = n;
1727 }
1728
1729 static void cwq_activate_first_delayed(struct cpu_workqueue_struct *cwq)
1730 {
1731         struct work_struct *work = list_first_entry(&cwq->delayed_works,
1732                                                     struct work_struct, entry);
1733
1734         trace_workqueue_activate_work(work);
1735         move_linked_works(work, &cwq->pool->worklist, NULL);
1736         __clear_bit(WORK_STRUCT_DELAYED_BIT, work_data_bits(work));
1737         cwq->nr_active++;
1738 }
1739
1740 /**
1741  * cwq_dec_nr_in_flight - decrement cwq's nr_in_flight
1742  * @cwq: cwq of interest
1743  * @color: color of work which left the queue
1744  * @delayed: for a delayed work
1745  *
1746  * A work either has completed or is removed from pending queue,
1747  * decrement nr_in_flight of its cwq and handle workqueue flushing.
1748  *
1749  * CONTEXT:
1750  * spin_lock_irq(gcwq->lock).
1751  */
1752 static void cwq_dec_nr_in_flight(struct cpu_workqueue_struct *cwq, int color,
1753                                  bool delayed)
1754 {
1755         /* ignore uncolored works */
1756         if (color == WORK_NO_COLOR)
1757                 return;
1758
1759         cwq->nr_in_flight[color]--;
1760
1761         if (!delayed) {
1762                 cwq->nr_active--;
1763                 if (!list_empty(&cwq->delayed_works)) {
1764                         /* one down, submit a delayed one */
1765                         if (cwq->nr_active < cwq->max_active)
1766                                 cwq_activate_first_delayed(cwq);
1767                 }
1768         }
1769
1770         /* is flush in progress and are we at the flushing tip? */
1771         if (likely(cwq->flush_color != color))
1772                 return;
1773
1774         /* are there still in-flight works? */
1775         if (cwq->nr_in_flight[color])
1776                 return;
1777
1778         /* this cwq is done, clear flush_color */
1779         cwq->flush_color = -1;
1780
1781         /*
1782          * If this was the last cwq, wake up the first flusher.  It
1783          * will handle the rest.
1784          */
1785         if (atomic_dec_and_test(&cwq->wq->nr_cwqs_to_flush))
1786                 complete(&cwq->wq->first_flusher->done);
1787 }
1788
1789 /**
1790  * process_one_work - process single work
1791  * @worker: self
1792  * @work: work to process
1793  *
1794  * Process @work.  This function contains all the logics necessary to
1795  * process a single work including synchronization against and
1796  * interaction with other workers on the same cpu, queueing and
1797  * flushing.  As long as context requirement is met, any worker can
1798  * call this function to process a work.
1799  *
1800  * CONTEXT:
1801  * spin_lock_irq(gcwq->lock) which is released and regrabbed.
1802  */
1803 static void process_one_work(struct worker *worker, struct work_struct *work)
1804 __releases(&gcwq->lock)
1805 __acquires(&gcwq->lock)
1806 {
1807         struct cpu_workqueue_struct *cwq = get_work_cwq(work);
1808         struct worker_pool *pool = worker->pool;
1809         struct global_cwq *gcwq = pool->gcwq;
1810         struct hlist_head *bwh = busy_worker_head(gcwq, work);
1811         bool cpu_intensive = cwq->wq->flags & WQ_CPU_INTENSIVE;
1812         work_func_t f = work->func;
1813         int work_color;
1814         struct worker *collision;
1815 #ifdef CONFIG_LOCKDEP
1816         /*
1817          * It is permissible to free the struct work_struct from
1818          * inside the function that is called from it, this we need to
1819          * take into account for lockdep too.  To avoid bogus "held
1820          * lock freed" warnings as well as problems when looking into
1821          * work->lockdep_map, make a copy and use that here.
1822          */
1823         struct lockdep_map lockdep_map;
1824
1825         lockdep_copy_map(&lockdep_map, &work->lockdep_map);
1826 #endif
1827         /*
1828          * A single work shouldn't be executed concurrently by
1829          * multiple workers on a single cpu.  Check whether anyone is
1830          * already processing the work.  If so, defer the work to the
1831          * currently executing one.
1832          */
1833         collision = __find_worker_executing_work(gcwq, bwh, work);
1834         if (unlikely(collision)) {
1835                 move_linked_works(work, &collision->scheduled, NULL);
1836                 return;
1837         }
1838
1839         /* claim and process */
1840         debug_work_deactivate(work);
1841         hlist_add_head(&worker->hentry, bwh);
1842         worker->current_work = work;
1843         worker->current_cwq = cwq;
1844         work_color = get_work_color(work);
1845
1846         /* record the current cpu number in the work data and dequeue */
1847         set_work_cpu(work, gcwq->cpu);
1848         list_del_init(&work->entry);
1849
1850         /*
1851          * CPU intensive works don't participate in concurrency
1852          * management.  They're the scheduler's responsibility.
1853          */
1854         if (unlikely(cpu_intensive))
1855                 worker_set_flags(worker, WORKER_CPU_INTENSIVE, true);
1856
1857         /*
1858          * Unbound gcwq isn't concurrency managed and work items should be
1859          * executed ASAP.  Wake up another worker if necessary.
1860          */
1861         if ((worker->flags & WORKER_UNBOUND) && need_more_worker(pool))
1862                 wake_up_worker(pool);
1863
1864         spin_unlock_irq(&gcwq->lock);
1865
1866         work_clear_pending(work);
1867         lock_map_acquire_read(&cwq->wq->lockdep_map);
1868         lock_map_acquire(&lockdep_map);
1869         trace_workqueue_execute_start(work);
1870         f(work);
1871         /*
1872          * While we must be careful to not use "work" after this, the trace
1873          * point will only record its address.
1874          */
1875         trace_workqueue_execute_end(work);
1876         lock_map_release(&lockdep_map);
1877         lock_map_release(&cwq->wq->lockdep_map);
1878
1879         if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
1880                 printk(KERN_ERR "BUG: workqueue leaked lock or atomic: "
1881                        "%s/0x%08x/%d\n",
1882                        current->comm, preempt_count(), task_pid_nr(current));
1883                 printk(KERN_ERR "    last function: ");
1884                 print_symbol("%s\n", (unsigned long)f);
1885                 debug_show_held_locks(current);
1886                 dump_stack();
1887         }
1888
1889         spin_lock_irq(&gcwq->lock);
1890
1891         /* clear cpu intensive status */
1892         if (unlikely(cpu_intensive))
1893                 worker_clr_flags(worker, WORKER_CPU_INTENSIVE);
1894
1895         /* we're done with it, release */
1896         hlist_del_init(&worker->hentry);
1897         worker->current_work = NULL;
1898         worker->current_cwq = NULL;
1899         cwq_dec_nr_in_flight(cwq, work_color, false);
1900 }
1901
1902 /**
1903  * process_scheduled_works - process scheduled works
1904  * @worker: self
1905  *
1906  * Process all scheduled works.  Please note that the scheduled list
1907  * may change while processing a work, so this function repeatedly
1908  * fetches a work from the top and executes it.
1909  *
1910  * CONTEXT:
1911  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
1912  * multiple times.
1913  */
1914 static void process_scheduled_works(struct worker *worker)
1915 {
1916         while (!list_empty(&worker->scheduled)) {
1917                 struct work_struct *work = list_first_entry(&worker->scheduled,
1918                                                 struct work_struct, entry);
1919                 process_one_work(worker, work);
1920         }
1921 }
1922
1923 /**
1924  * worker_thread - the worker thread function
1925  * @__worker: self
1926  *
1927  * The gcwq worker thread function.  There's a single dynamic pool of
1928  * these per each cpu.  These workers process all works regardless of
1929  * their specific target workqueue.  The only exception is works which
1930  * belong to workqueues with a rescuer which will be explained in
1931  * rescuer_thread().
1932  */
1933 static int worker_thread(void *__worker)
1934 {
1935         struct worker *worker = __worker;
1936         struct worker_pool *pool = worker->pool;
1937         struct global_cwq *gcwq = pool->gcwq;
1938
1939         /* tell the scheduler that this is a workqueue worker */
1940         worker->task->flags |= PF_WQ_WORKER;
1941 woke_up:
1942         spin_lock_irq(&gcwq->lock);
1943
1944         /* DIE can be set only while we're idle, checking here is enough */
1945         if (worker->flags & WORKER_DIE) {
1946                 spin_unlock_irq(&gcwq->lock);
1947                 worker->task->flags &= ~PF_WQ_WORKER;
1948                 return 0;
1949         }
1950
1951         worker_leave_idle(worker);
1952 recheck:
1953         /* no more worker necessary? */
1954         if (!need_more_worker(pool))
1955                 goto sleep;
1956
1957         /* do we need to manage? */
1958         if (unlikely(!may_start_working(pool)) && manage_workers(worker))
1959                 goto recheck;
1960
1961         /*
1962          * ->scheduled list can only be filled while a worker is
1963          * preparing to process a work or actually processing it.
1964          * Make sure nobody diddled with it while I was sleeping.
1965          */
1966         BUG_ON(!list_empty(&worker->scheduled));
1967
1968         /*
1969          * When control reaches this point, we're guaranteed to have
1970          * at least one idle worker or that someone else has already
1971          * assumed the manager role.
1972          */
1973         worker_clr_flags(worker, WORKER_PREP);
1974
1975         do {
1976                 struct work_struct *work =
1977                         list_first_entry(&pool->worklist,
1978                                          struct work_struct, entry);
1979
1980                 if (likely(!(*work_data_bits(work) & WORK_STRUCT_LINKED))) {
1981                         /* optimization path, not strictly necessary */
1982                         process_one_work(worker, work);
1983                         if (unlikely(!list_empty(&worker->scheduled)))
1984                                 process_scheduled_works(worker);
1985                 } else {
1986                         move_linked_works(work, &worker->scheduled, NULL);
1987                         process_scheduled_works(worker);
1988                 }
1989         } while (keep_working(pool));
1990
1991         worker_set_flags(worker, WORKER_PREP, false);
1992 sleep:
1993         if (unlikely(need_to_manage_workers(pool)) && manage_workers(worker))
1994                 goto recheck;
1995
1996         /*
1997          * gcwq->lock is held and there's no work to process and no
1998          * need to manage, sleep.  Workers are woken up only while
1999          * holding gcwq->lock or from local cpu, so setting the
2000          * current state before releasing gcwq->lock is enough to
2001          * prevent losing any event.
2002          */
2003         worker_enter_idle(worker);
2004         __set_current_state(TASK_INTERRUPTIBLE);
2005         spin_unlock_irq(&gcwq->lock);
2006         schedule();
2007         goto woke_up;
2008 }
2009
2010 /**
2011  * rescuer_thread - the rescuer thread function
2012  * @__wq: the associated workqueue
2013  *
2014  * Workqueue rescuer thread function.  There's one rescuer for each
2015  * workqueue which has WQ_RESCUER set.
2016  *
2017  * Regular work processing on a gcwq may block trying to create a new
2018  * worker which uses GFP_KERNEL allocation which has slight chance of
2019  * developing into deadlock if some works currently on the same queue
2020  * need to be processed to satisfy the GFP_KERNEL allocation.  This is
2021  * the problem rescuer solves.
2022  *
2023  * When such condition is possible, the gcwq summons rescuers of all
2024  * workqueues which have works queued on the gcwq and let them process
2025  * those works so that forward progress can be guaranteed.
2026  *
2027  * This should happen rarely.
2028  */
2029 static int rescuer_thread(void *__wq)
2030 {
2031         struct workqueue_struct *wq = __wq;
2032         struct worker *rescuer = wq->rescuer;
2033         struct list_head *scheduled = &rescuer->scheduled;
2034         bool is_unbound = wq->flags & WQ_UNBOUND;
2035         unsigned int cpu;
2036
2037         set_user_nice(current, RESCUER_NICE_LEVEL);
2038 repeat:
2039         set_current_state(TASK_INTERRUPTIBLE);
2040
2041         if (kthread_should_stop())
2042                 return 0;
2043
2044         /*
2045          * See whether any cpu is asking for help.  Unbounded
2046          * workqueues use cpu 0 in mayday_mask for CPU_UNBOUND.
2047          */
2048         for_each_mayday_cpu(cpu, wq->mayday_mask) {
2049                 unsigned int tcpu = is_unbound ? WORK_CPU_UNBOUND : cpu;
2050                 struct cpu_workqueue_struct *cwq = get_cwq(tcpu, wq);
2051                 struct worker_pool *pool = cwq->pool;
2052                 struct global_cwq *gcwq = pool->gcwq;
2053                 struct work_struct *work, *n;
2054
2055                 __set_current_state(TASK_RUNNING);
2056                 mayday_clear_cpu(cpu, wq->mayday_mask);
2057
2058                 /* migrate to the target cpu if possible */
2059                 rescuer->pool = pool;
2060                 worker_maybe_bind_and_lock(rescuer);
2061
2062                 /*
2063                  * Slurp in all works issued via this workqueue and
2064                  * process'em.
2065                  */
2066                 BUG_ON(!list_empty(&rescuer->scheduled));
2067                 list_for_each_entry_safe(work, n, &pool->worklist, entry)
2068                         if (get_work_cwq(work) == cwq)
2069                                 move_linked_works(work, scheduled, &n);
2070
2071                 process_scheduled_works(rescuer);
2072
2073                 /*
2074                  * Leave this gcwq.  If keep_working() is %true, notify a
2075                  * regular worker; otherwise, we end up with 0 concurrency
2076                  * and stalling the execution.
2077                  */
2078                 if (keep_working(pool))
2079                         wake_up_worker(pool);
2080
2081                 spin_unlock_irq(&gcwq->lock);
2082         }
2083
2084         schedule();
2085         goto repeat;
2086 }
2087
2088 struct wq_barrier {
2089         struct work_struct      work;
2090         struct completion       done;
2091 };
2092
2093 static void wq_barrier_func(struct work_struct *work)
2094 {
2095         struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
2096         complete(&barr->done);
2097 }
2098
2099 /**
2100  * insert_wq_barrier - insert a barrier work
2101  * @cwq: cwq to insert barrier into
2102  * @barr: wq_barrier to insert
2103  * @target: target work to attach @barr to
2104  * @worker: worker currently executing @target, NULL if @target is not executing
2105  *
2106  * @barr is linked to @target such that @barr is completed only after
2107  * @target finishes execution.  Please note that the ordering
2108  * guarantee is observed only with respect to @target and on the local
2109  * cpu.
2110  *
2111  * Currently, a queued barrier can't be canceled.  This is because
2112  * try_to_grab_pending() can't determine whether the work to be
2113  * grabbed is at the head of the queue and thus can't clear LINKED
2114  * flag of the previous work while there must be a valid next work
2115  * after a work with LINKED flag set.
2116  *
2117  * Note that when @worker is non-NULL, @target may be modified
2118  * underneath us, so we can't reliably determine cwq from @target.
2119  *
2120  * CONTEXT:
2121  * spin_lock_irq(gcwq->lock).
2122  */
2123 static void insert_wq_barrier(struct cpu_workqueue_struct *cwq,
2124                               struct wq_barrier *barr,
2125                               struct work_struct *target, struct worker *worker)
2126 {
2127         struct list_head *head;
2128         unsigned int linked = 0;
2129
2130         /*
2131          * debugobject calls are safe here even with gcwq->lock locked
2132          * as we know for sure that this will not trigger any of the
2133          * checks and call back into the fixup functions where we
2134          * might deadlock.
2135          */
2136         INIT_WORK_ONSTACK(&barr->work, wq_barrier_func);
2137         __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
2138         init_completion(&barr->done);
2139
2140         /*
2141          * If @target is currently being executed, schedule the
2142          * barrier to the worker; otherwise, put it after @target.
2143          */
2144         if (worker)
2145                 head = worker->scheduled.next;
2146         else {
2147                 unsigned long *bits = work_data_bits(target);
2148
2149                 head = target->entry.next;
2150                 /* there can already be other linked works, inherit and set */
2151                 linked = *bits & WORK_STRUCT_LINKED;
2152                 __set_bit(WORK_STRUCT_LINKED_BIT, bits);
2153         }
2154
2155         debug_work_activate(&barr->work);
2156         insert_work(cwq, &barr->work, head,
2157                     work_color_to_flags(WORK_NO_COLOR) | linked);
2158 }
2159
2160 /**
2161  * flush_workqueue_prep_cwqs - prepare cwqs for workqueue flushing
2162  * @wq: workqueue being flushed
2163  * @flush_color: new flush color, < 0 for no-op
2164  * @work_color: new work color, < 0 for no-op
2165  *
2166  * Prepare cwqs for workqueue flushing.
2167  *
2168  * If @flush_color is non-negative, flush_color on all cwqs should be
2169  * -1.  If no cwq has in-flight commands at the specified color, all
2170  * cwq->flush_color's stay at -1 and %false is returned.  If any cwq
2171  * has in flight commands, its cwq->flush_color is set to
2172  * @flush_color, @wq->nr_cwqs_to_flush is updated accordingly, cwq
2173  * wakeup logic is armed and %true is returned.
2174  *
2175  * The caller should have initialized @wq->first_flusher prior to
2176  * calling this function with non-negative @flush_color.  If
2177  * @flush_color is negative, no flush color update is done and %false
2178  * is returned.
2179  *
2180  * If @work_color is non-negative, all cwqs should have the same
2181  * work_color which is previous to @work_color and all will be
2182  * advanced to @work_color.
2183  *
2184  * CONTEXT:
2185  * mutex_lock(wq->flush_mutex).
2186  *
2187  * RETURNS:
2188  * %true if @flush_color >= 0 and there's something to flush.  %false
2189  * otherwise.
2190  */
2191 static bool flush_workqueue_prep_cwqs(struct workqueue_struct *wq,
2192                                       int flush_color, int work_color)
2193 {
2194         bool wait = false;
2195         unsigned int cpu;
2196
2197         if (flush_color >= 0) {
2198                 BUG_ON(atomic_read(&wq->nr_cwqs_to_flush));
2199                 atomic_set(&wq->nr_cwqs_to_flush, 1);
2200         }
2201
2202         for_each_cwq_cpu(cpu, wq) {
2203                 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
2204                 struct global_cwq *gcwq = cwq->pool->gcwq;
2205
2206                 spin_lock_irq(&gcwq->lock);
2207
2208                 if (flush_color >= 0) {
2209                         BUG_ON(cwq->flush_color != -1);
2210
2211                         if (cwq->nr_in_flight[flush_color]) {
2212                                 cwq->flush_color = flush_color;
2213                                 atomic_inc(&wq->nr_cwqs_to_flush);
2214                                 wait = true;
2215                         }
2216                 }
2217
2218                 if (work_color >= 0) {
2219                         BUG_ON(work_color != work_next_color(cwq->work_color));
2220                         cwq->work_color = work_color;
2221                 }
2222
2223                 spin_unlock_irq(&gcwq->lock);
2224         }
2225
2226         if (flush_color >= 0 && atomic_dec_and_test(&wq->nr_cwqs_to_flush))
2227                 complete(&wq->first_flusher->done);
2228
2229         return wait;
2230 }
2231
2232 /**
2233  * flush_workqueue - ensure that any scheduled work has run to completion.
2234  * @wq: workqueue to flush
2235  *
2236  * Forces execution of the workqueue and blocks until its completion.
2237  * This is typically used in driver shutdown handlers.
2238  *
2239  * We sleep until all works which were queued on entry have been handled,
2240  * but we are not livelocked by new incoming ones.
2241  */
2242 void flush_workqueue(struct workqueue_struct *wq)
2243 {
2244         struct wq_flusher this_flusher = {
2245                 .list = LIST_HEAD_INIT(this_flusher.list),
2246                 .flush_color = -1,
2247                 .done = COMPLETION_INITIALIZER_ONSTACK(this_flusher.done),
2248         };
2249         int next_color;
2250
2251         lock_map_acquire(&wq->lockdep_map);
2252         lock_map_release(&wq->lockdep_map);
2253
2254         mutex_lock(&wq->flush_mutex);
2255
2256         /*
2257          * Start-to-wait phase
2258          */
2259         next_color = work_next_color(wq->work_color);
2260
2261         if (next_color != wq->flush_color) {
2262                 /*
2263                  * Color space is not full.  The current work_color
2264                  * becomes our flush_color and work_color is advanced
2265                  * by one.
2266                  */
2267                 BUG_ON(!list_empty(&wq->flusher_overflow));
2268                 this_flusher.flush_color = wq->work_color;
2269                 wq->work_color = next_color;
2270
2271                 if (!wq->first_flusher) {
2272                         /* no flush in progress, become the first flusher */
2273                         BUG_ON(wq->flush_color != this_flusher.flush_color);
2274
2275                         wq->first_flusher = &this_flusher;
2276
2277                         if (!flush_workqueue_prep_cwqs(wq, wq->flush_color,
2278                                                        wq->work_color)) {
2279                                 /* nothing to flush, done */
2280                                 wq->flush_color = next_color;
2281                                 wq->first_flusher = NULL;
2282                                 goto out_unlock;
2283                         }
2284                 } else {
2285                         /* wait in queue */
2286                         BUG_ON(wq->flush_color == this_flusher.flush_color);
2287                         list_add_tail(&this_flusher.list, &wq->flusher_queue);
2288                         flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
2289                 }
2290         } else {
2291                 /*
2292                  * Oops, color space is full, wait on overflow queue.
2293                  * The next flush completion will assign us
2294                  * flush_color and transfer to flusher_queue.
2295                  */
2296                 list_add_tail(&this_flusher.list, &wq->flusher_overflow);
2297         }
2298
2299         mutex_unlock(&wq->flush_mutex);
2300
2301         wait_for_completion(&this_flusher.done);
2302
2303         /*
2304          * Wake-up-and-cascade phase
2305          *
2306          * First flushers are responsible for cascading flushes and
2307          * handling overflow.  Non-first flushers can simply return.
2308          */
2309         if (wq->first_flusher != &this_flusher)
2310                 return;
2311
2312         mutex_lock(&wq->flush_mutex);
2313
2314         /* we might have raced, check again with mutex held */
2315         if (wq->first_flusher != &this_flusher)
2316                 goto out_unlock;
2317
2318         wq->first_flusher = NULL;
2319
2320         BUG_ON(!list_empty(&this_flusher.list));
2321         BUG_ON(wq->flush_color != this_flusher.flush_color);
2322
2323         while (true) {
2324                 struct wq_flusher *next, *tmp;
2325
2326                 /* complete all the flushers sharing the current flush color */
2327                 list_for_each_entry_safe(next, tmp, &wq->flusher_queue, list) {
2328                         if (next->flush_color != wq->flush_color)
2329                                 break;
2330                         list_del_init(&next->list);
2331                         complete(&next->done);
2332                 }
2333
2334                 BUG_ON(!list_empty(&wq->flusher_overflow) &&
2335                        wq->flush_color != work_next_color(wq->work_color));
2336
2337                 /* this flush_color is finished, advance by one */
2338                 wq->flush_color = work_next_color(wq->flush_color);
2339
2340                 /* one color has been freed, handle overflow queue */
2341                 if (!list_empty(&wq->flusher_overflow)) {
2342                         /*
2343                          * Assign the same color to all overflowed
2344                          * flushers, advance work_color and append to
2345                          * flusher_queue.  This is the start-to-wait
2346                          * phase for these overflowed flushers.
2347                          */
2348                         list_for_each_entry(tmp, &wq->flusher_overflow, list)
2349                                 tmp->flush_color = wq->work_color;
2350
2351                         wq->work_color = work_next_color(wq->work_color);
2352
2353                         list_splice_tail_init(&wq->flusher_overflow,
2354                                               &wq->flusher_queue);
2355                         flush_workqueue_prep_cwqs(wq, -1, wq->work_color);
2356                 }
2357
2358                 if (list_empty(&wq->flusher_queue)) {
2359                         BUG_ON(wq->flush_color != wq->work_color);
2360                         break;
2361                 }
2362
2363                 /*
2364                  * Need to flush more colors.  Make the next flusher
2365                  * the new first flusher and arm cwqs.
2366                  */
2367                 BUG_ON(wq->flush_color == wq->work_color);
2368                 BUG_ON(wq->flush_color != next->flush_color);
2369
2370                 list_del_init(&next->list);
2371                 wq->first_flusher = next;
2372
2373                 if (flush_workqueue_prep_cwqs(wq, wq->flush_color, -1))
2374                         break;
2375
2376                 /*
2377                  * Meh... this color is already done, clear first
2378                  * flusher and repeat cascading.
2379                  */
2380                 wq->first_flusher = NULL;
2381         }
2382
2383 out_unlock:
2384         mutex_unlock(&wq->flush_mutex);
2385 }
2386 EXPORT_SYMBOL_GPL(flush_workqueue);
2387
2388 /**
2389  * drain_workqueue - drain a workqueue
2390  * @wq: workqueue to drain
2391  *
2392  * Wait until the workqueue becomes empty.  While draining is in progress,
2393  * only chain queueing is allowed.  IOW, only currently pending or running
2394  * work items on @wq can queue further work items on it.  @wq is flushed
2395  * repeatedly until it becomes empty.  The number of flushing is detemined
2396  * by the depth of chaining and should be relatively short.  Whine if it
2397  * takes too long.
2398  */
2399 void drain_workqueue(struct workqueue_struct *wq)
2400 {
2401         unsigned int flush_cnt = 0;
2402         unsigned int cpu;
2403
2404         /*
2405          * __queue_work() needs to test whether there are drainers, is much
2406          * hotter than drain_workqueue() and already looks at @wq->flags.
2407          * Use WQ_DRAINING so that queue doesn't have to check nr_drainers.
2408          */
2409         spin_lock(&workqueue_lock);
2410         if (!wq->nr_drainers++)
2411                 wq->flags |= WQ_DRAINING;
2412         spin_unlock(&workqueue_lock);
2413 reflush:
2414         flush_workqueue(wq);
2415
2416         for_each_cwq_cpu(cpu, wq) {
2417                 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
2418                 bool drained;
2419
2420                 spin_lock_irq(&cwq->pool->gcwq->lock);
2421                 drained = !cwq->nr_active && list_empty(&cwq->delayed_works);
2422                 spin_unlock_irq(&cwq->pool->gcwq->lock);
2423
2424                 if (drained)
2425                         continue;
2426
2427                 if (++flush_cnt == 10 ||
2428                     (flush_cnt % 100 == 0 && flush_cnt <= 1000))
2429                         pr_warning("workqueue %s: flush on destruction isn't complete after %u tries\n",
2430                                    wq->name, flush_cnt);
2431                 goto reflush;
2432         }
2433
2434         spin_lock(&workqueue_lock);
2435         if (!--wq->nr_drainers)
2436                 wq->flags &= ~WQ_DRAINING;
2437         spin_unlock(&workqueue_lock);
2438 }
2439 EXPORT_SYMBOL_GPL(drain_workqueue);
2440
2441 static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
2442                              bool wait_executing)
2443 {
2444         struct worker *worker = NULL;
2445         struct global_cwq *gcwq;
2446         struct cpu_workqueue_struct *cwq;
2447
2448         might_sleep();
2449         gcwq = get_work_gcwq(work);
2450         if (!gcwq)
2451                 return false;
2452
2453         spin_lock_irq(&gcwq->lock);
2454         if (!list_empty(&work->entry)) {
2455                 /*
2456                  * See the comment near try_to_grab_pending()->smp_rmb().
2457                  * If it was re-queued to a different gcwq under us, we
2458                  * are not going to wait.
2459                  */
2460                 smp_rmb();
2461                 cwq = get_work_cwq(work);
2462                 if (unlikely(!cwq || gcwq != cwq->pool->gcwq))
2463                         goto already_gone;
2464         } else if (wait_executing) {
2465                 worker = find_worker_executing_work(gcwq, work);
2466                 if (!worker)
2467                         goto already_gone;
2468                 cwq = worker->current_cwq;
2469         } else
2470                 goto already_gone;
2471
2472         insert_wq_barrier(cwq, barr, work, worker);
2473         spin_unlock_irq(&gcwq->lock);
2474
2475         /*
2476          * If @max_active is 1 or rescuer is in use, flushing another work
2477          * item on the same workqueue may lead to deadlock.  Make sure the
2478          * flusher is not running on the same workqueue by verifying write
2479          * access.
2480          */
2481         if (cwq->wq->saved_max_active == 1 || cwq->wq->flags & WQ_RESCUER)
2482                 lock_map_acquire(&cwq->wq->lockdep_map);
2483         else
2484                 lock_map_acquire_read(&cwq->wq->lockdep_map);
2485         lock_map_release(&cwq->wq->lockdep_map);
2486
2487         return true;
2488 already_gone:
2489         spin_unlock_irq(&gcwq->lock);
2490         return false;
2491 }
2492
2493 /**
2494  * flush_work - wait for a work to finish executing the last queueing instance
2495  * @work: the work to flush
2496  *
2497  * Wait until @work has finished execution.  This function considers
2498  * only the last queueing instance of @work.  If @work has been
2499  * enqueued across different CPUs on a non-reentrant workqueue or on
2500  * multiple workqueues, @work might still be executing on return on
2501  * some of the CPUs from earlier queueing.
2502  *
2503  * If @work was queued only on a non-reentrant, ordered or unbound
2504  * workqueue, @work is guaranteed to be idle on return if it hasn't
2505  * been requeued since flush started.
2506  *
2507  * RETURNS:
2508  * %true if flush_work() waited for the work to finish execution,
2509  * %false if it was already idle.
2510  */
2511 bool flush_work(struct work_struct *work)
2512 {
2513         struct wq_barrier barr;
2514
2515         lock_map_acquire(&work->lockdep_map);
2516         lock_map_release(&work->lockdep_map);
2517
2518         if (start_flush_work(work, &barr, true)) {
2519                 wait_for_completion(&barr.done);
2520                 destroy_work_on_stack(&barr.work);
2521                 return true;
2522         } else
2523                 return false;
2524 }
2525 EXPORT_SYMBOL_GPL(flush_work);
2526
2527 static bool wait_on_cpu_work(struct global_cwq *gcwq, struct work_struct *work)
2528 {
2529         struct wq_barrier barr;
2530         struct worker *worker;
2531
2532         spin_lock_irq(&gcwq->lock);
2533
2534         worker = find_worker_executing_work(gcwq, work);
2535         if (unlikely(worker))
2536                 insert_wq_barrier(worker->current_cwq, &barr, work, worker);
2537
2538         spin_unlock_irq(&gcwq->lock);
2539
2540         if (unlikely(worker)) {
2541                 wait_for_completion(&barr.done);
2542                 destroy_work_on_stack(&barr.work);
2543                 return true;
2544         } else
2545                 return false;
2546 }
2547
2548 static bool wait_on_work(struct work_struct *work)
2549 {
2550         bool ret = false;
2551         int cpu;
2552
2553         might_sleep();
2554
2555         lock_map_acquire(&work->lockdep_map);
2556         lock_map_release(&work->lockdep_map);
2557
2558         for_each_gcwq_cpu(cpu)
2559                 ret |= wait_on_cpu_work(get_gcwq(cpu), work);
2560         return ret;
2561 }
2562
2563 /**
2564  * flush_work_sync - wait until a work has finished execution
2565  * @work: the work to flush
2566  *
2567  * Wait until @work has finished execution.  On return, it's
2568  * guaranteed that all queueing instances of @work which happened
2569  * before this function is called are finished.  In other words, if
2570  * @work hasn't been requeued since this function was called, @work is
2571  * guaranteed to be idle on return.
2572  *
2573  * RETURNS:
2574  * %true if flush_work_sync() waited for the work to finish execution,
2575  * %false if it was already idle.
2576  */
2577 bool flush_work_sync(struct work_struct *work)
2578 {
2579         struct wq_barrier barr;
2580         bool pending, waited;
2581
2582         /* we'll wait for executions separately, queue barr only if pending */
2583         pending = start_flush_work(work, &barr, false);
2584
2585         /* wait for executions to finish */
2586         waited = wait_on_work(work);
2587
2588         /* wait for the pending one */
2589         if (pending) {
2590                 wait_for_completion(&barr.done);
2591                 destroy_work_on_stack(&barr.work);
2592         }
2593
2594         return pending || waited;
2595 }
2596 EXPORT_SYMBOL_GPL(flush_work_sync);
2597
2598 /*
2599  * Upon a successful return (>= 0), the caller "owns" WORK_STRUCT_PENDING bit,
2600  * so this work can't be re-armed in any way.
2601  */
2602 static int try_to_grab_pending(struct work_struct *work)
2603 {
2604         struct global_cwq *gcwq;
2605         int ret = -1;
2606
2607         if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
2608                 return 0;
2609
2610         /*
2611          * The queueing is in progress, or it is already queued. Try to
2612          * steal it from ->worklist without clearing WORK_STRUCT_PENDING.
2613          */
2614         gcwq = get_work_gcwq(work);
2615         if (!gcwq)
2616                 return ret;
2617
2618         spin_lock_irq(&gcwq->lock);
2619         if (!list_empty(&work->entry)) {
2620                 /*
2621                  * This work is queued, but perhaps we locked the wrong gcwq.
2622                  * In that case we must see the new value after rmb(), see
2623                  * insert_work()->wmb().
2624                  */
2625                 smp_rmb();
2626                 if (gcwq == get_work_gcwq(work)) {
2627                         debug_work_deactivate(work);
2628                         list_del_init(&work->entry);
2629                         cwq_dec_nr_in_flight(get_work_cwq(work),
2630                                 get_work_color(work),
2631                                 *work_data_bits(work) & WORK_STRUCT_DELAYED);
2632                         ret = 1;
2633                 }
2634         }
2635         spin_unlock_irq(&gcwq->lock);
2636
2637         return ret;
2638 }
2639
2640 static bool __cancel_work_timer(struct work_struct *work,
2641                                 struct timer_list* timer)
2642 {
2643         int ret;
2644
2645         do {
2646                 ret = (timer && likely(del_timer(timer)));
2647                 if (!ret)
2648                         ret = try_to_grab_pending(work);
2649                 wait_on_work(work);
2650         } while (unlikely(ret < 0));
2651
2652         clear_work_data(work);
2653         return ret;
2654 }
2655
2656 /**
2657  * cancel_work_sync - cancel a work and wait for it to finish
2658  * @work: the work to cancel
2659  *
2660  * Cancel @work and wait for its execution to finish.  This function
2661  * can be used even if the work re-queues itself or migrates to
2662  * another workqueue.  On return from this function, @work is
2663  * guaranteed to be not pending or executing on any CPU.
2664  *
2665  * cancel_work_sync(&delayed_work->work) must not be used for
2666  * delayed_work's.  Use cancel_delayed_work_sync() instead.
2667  *
2668  * The caller must ensure that the workqueue on which @work was last
2669  * queued can't be destroyed before this function returns.
2670  *
2671  * RETURNS:
2672  * %true if @work was pending, %false otherwise.
2673  */
2674 bool cancel_work_sync(struct work_struct *work)
2675 {
2676         return __cancel_work_timer(work, NULL);
2677 }
2678 EXPORT_SYMBOL_GPL(cancel_work_sync);
2679
2680 /**
2681  * flush_delayed_work - wait for a dwork to finish executing the last queueing
2682  * @dwork: the delayed work to flush
2683  *
2684  * Delayed timer is cancelled and the pending work is queued for
2685  * immediate execution.  Like flush_work(), this function only
2686  * considers the last queueing instance of @dwork.
2687  *
2688  * RETURNS:
2689  * %true if flush_work() waited for the work to finish execution,
2690  * %false if it was already idle.
2691  */
2692 bool flush_delayed_work(struct delayed_work *dwork)
2693 {
2694         if (del_timer_sync(&dwork->timer))
2695                 __queue_work(raw_smp_processor_id(),
2696                              get_work_cwq(&dwork->work)->wq, &dwork->work);
2697         return flush_work(&dwork->work);
2698 }
2699 EXPORT_SYMBOL(flush_delayed_work);
2700
2701 /**
2702  * flush_delayed_work_sync - wait for a dwork to finish
2703  * @dwork: the delayed work to flush
2704  *
2705  * Delayed timer is cancelled and the pending work is queued for
2706  * execution immediately.  Other than timer handling, its behavior
2707  * is identical to flush_work_sync().
2708  *
2709  * RETURNS:
2710  * %true if flush_work_sync() waited for the work to finish execution,
2711  * %false if it was already idle.
2712  */
2713 bool flush_delayed_work_sync(struct delayed_work *dwork)
2714 {
2715         if (del_timer_sync(&dwork->timer))
2716                 __queue_work(raw_smp_processor_id(),
2717                              get_work_cwq(&dwork->work)->wq, &dwork->work);
2718         return flush_work_sync(&dwork->work);
2719 }
2720 EXPORT_SYMBOL(flush_delayed_work_sync);
2721
2722 /**
2723  * cancel_delayed_work_sync - cancel a delayed work and wait for it to finish
2724  * @dwork: the delayed work cancel
2725  *
2726  * This is cancel_work_sync() for delayed works.
2727  *
2728  * RETURNS:
2729  * %true if @dwork was pending, %false otherwise.
2730  */
2731 bool cancel_delayed_work_sync(struct delayed_work *dwork)
2732 {
2733         return __cancel_work_timer(&dwork->work, &dwork->timer);
2734 }
2735 EXPORT_SYMBOL(cancel_delayed_work_sync);
2736
2737 /**
2738  * schedule_work - put work task in global workqueue
2739  * @work: job to be done
2740  *
2741  * Returns zero if @work was already on the kernel-global workqueue and
2742  * non-zero otherwise.
2743  *
2744  * This puts a job in the kernel-global workqueue if it was not already
2745  * queued and leaves it in the same position on the kernel-global
2746  * workqueue otherwise.
2747  */
2748 int schedule_work(struct work_struct *work)
2749 {
2750         return queue_work(system_wq, work);
2751 }
2752 EXPORT_SYMBOL(schedule_work);
2753
2754 /*
2755  * schedule_work_on - put work task on a specific cpu
2756  * @cpu: cpu to put the work task on
2757  * @work: job to be done
2758  *
2759  * This puts a job on a specific cpu
2760  */
2761 int schedule_work_on(int cpu, struct work_struct *work)
2762 {
2763         return queue_work_on(cpu, system_wq, work);
2764 }
2765 EXPORT_SYMBOL(schedule_work_on);
2766
2767 /**
2768  * schedule_delayed_work - put work task in global workqueue after delay
2769  * @dwork: job to be done
2770  * @delay: number of jiffies to wait or 0 for immediate execution
2771  *
2772  * After waiting for a given time this puts a job in the kernel-global
2773  * workqueue.
2774  */
2775 int schedule_delayed_work(struct delayed_work *dwork,
2776                                         unsigned long delay)
2777 {
2778         return queue_delayed_work(system_wq, dwork, delay);
2779 }
2780 EXPORT_SYMBOL(schedule_delayed_work);
2781
2782 /**
2783  * schedule_delayed_work_on - queue work in global workqueue on CPU after delay
2784  * @cpu: cpu to use
2785  * @dwork: job to be done
2786  * @delay: number of jiffies to wait
2787  *
2788  * After waiting for a given time this puts a job in the kernel-global
2789  * workqueue on the specified CPU.
2790  */
2791 int schedule_delayed_work_on(int cpu,
2792                         struct delayed_work *dwork, unsigned long delay)
2793 {
2794         return queue_delayed_work_on(cpu, system_wq, dwork, delay);
2795 }
2796 EXPORT_SYMBOL(schedule_delayed_work_on);
2797
2798 /**
2799  * schedule_on_each_cpu - execute a function synchronously on each online CPU
2800  * @func: the function to call
2801  *
2802  * schedule_on_each_cpu() executes @func on each online CPU using the
2803  * system workqueue and blocks until all CPUs have completed.
2804  * schedule_on_each_cpu() is very slow.
2805  *
2806  * RETURNS:
2807  * 0 on success, -errno on failure.
2808  */
2809 int schedule_on_each_cpu(work_func_t func)
2810 {
2811         int cpu;
2812         struct work_struct __percpu *works;
2813
2814         works = alloc_percpu(struct work_struct);
2815         if (!works)
2816                 return -ENOMEM;
2817
2818         get_online_cpus();
2819
2820         for_each_online_cpu(cpu) {
2821                 struct work_struct *work = per_cpu_ptr(works, cpu);
2822
2823                 INIT_WORK(work, func);
2824                 schedule_work_on(cpu, work);
2825         }
2826
2827         for_each_online_cpu(cpu)
2828                 flush_work(per_cpu_ptr(works, cpu));
2829
2830         put_online_cpus();
2831         free_percpu(works);
2832         return 0;
2833 }
2834
2835 /**
2836  * flush_scheduled_work - ensure that any scheduled work has run to completion.
2837  *
2838  * Forces execution of the kernel-global workqueue and blocks until its
2839  * completion.
2840  *
2841  * Think twice before calling this function!  It's very easy to get into
2842  * trouble if you don't take great care.  Either of the following situations
2843  * will lead to deadlock:
2844  *
2845  *      One of the work items currently on the workqueue needs to acquire
2846  *      a lock held by your code or its caller.
2847  *
2848  *      Your code is running in the context of a work routine.
2849  *
2850  * They will be detected by lockdep when they occur, but the first might not
2851  * occur very often.  It depends on what work items are on the workqueue and
2852  * what locks they need, which you have no control over.
2853  *
2854  * In most situations flushing the entire workqueue is overkill; you merely
2855  * need to know that a particular work item isn't queued and isn't running.
2856  * In such cases you should use cancel_delayed_work_sync() or
2857  * cancel_work_sync() instead.
2858  */
2859 void flush_scheduled_work(void)
2860 {
2861         flush_workqueue(system_wq);
2862 }
2863 EXPORT_SYMBOL(flush_scheduled_work);
2864
2865 /**
2866  * execute_in_process_context - reliably execute the routine with user context
2867  * @fn:         the function to execute
2868  * @ew:         guaranteed storage for the execute work structure (must
2869  *              be available when the work executes)
2870  *
2871  * Executes the function immediately if process context is available,
2872  * otherwise schedules the function for delayed execution.
2873  *
2874  * Returns:     0 - function was executed
2875  *              1 - function was scheduled for execution
2876  */
2877 int execute_in_process_context(work_func_t fn, struct execute_work *ew)
2878 {
2879         if (!in_interrupt()) {
2880                 fn(&ew->work);
2881                 return 0;
2882         }
2883
2884         INIT_WORK(&ew->work, fn);
2885         schedule_work(&ew->work);
2886
2887         return 1;
2888 }
2889 EXPORT_SYMBOL_GPL(execute_in_process_context);
2890
2891 int keventd_up(void)
2892 {
2893         return system_wq != NULL;
2894 }
2895
2896 static int alloc_cwqs(struct workqueue_struct *wq)
2897 {
2898         /*
2899          * cwqs are forced aligned according to WORK_STRUCT_FLAG_BITS.
2900          * Make sure that the alignment isn't lower than that of
2901          * unsigned long long.
2902          */
2903         const size_t size = sizeof(struct cpu_workqueue_struct);
2904         const size_t align = max_t(size_t, 1 << WORK_STRUCT_FLAG_BITS,
2905                                    __alignof__(unsigned long long));
2906
2907         if (!(wq->flags & WQ_UNBOUND))
2908                 wq->cpu_wq.pcpu = __alloc_percpu(size, align);
2909         else {
2910                 void *ptr;
2911
2912                 /*
2913                  * Allocate enough room to align cwq and put an extra
2914                  * pointer at the end pointing back to the originally
2915                  * allocated pointer which will be used for free.
2916                  */
2917                 ptr = kzalloc(size + align + sizeof(void *), GFP_KERNEL);
2918                 if (ptr) {
2919                         wq->cpu_wq.single = PTR_ALIGN(ptr, align);
2920                         *(void **)(wq->cpu_wq.single + 1) = ptr;
2921                 }
2922         }
2923
2924         /* just in case, make sure it's actually aligned */
2925         BUG_ON(!IS_ALIGNED(wq->cpu_wq.v, align));
2926         return wq->cpu_wq.v ? 0 : -ENOMEM;
2927 }
2928
2929 static void free_cwqs(struct workqueue_struct *wq)
2930 {
2931         if (!(wq->flags & WQ_UNBOUND))
2932                 free_percpu(wq->cpu_wq.pcpu);
2933         else if (wq->cpu_wq.single) {
2934                 /* the pointer to free is stored right after the cwq */
2935                 kfree(*(void **)(wq->cpu_wq.single + 1));
2936         }
2937 }
2938
2939 static int wq_clamp_max_active(int max_active, unsigned int flags,
2940                                const char *name)
2941 {
2942         int lim = flags & WQ_UNBOUND ? WQ_UNBOUND_MAX_ACTIVE : WQ_MAX_ACTIVE;
2943
2944         if (max_active < 1 || max_active > lim)
2945                 printk(KERN_WARNING "workqueue: max_active %d requested for %s "
2946                        "is out of range, clamping between %d and %d\n",
2947                        max_active, name, 1, lim);
2948
2949         return clamp_val(max_active, 1, lim);
2950 }
2951
2952 struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
2953                                                unsigned int flags,
2954                                                int max_active,
2955                                                struct lock_class_key *key,
2956                                                const char *lock_name, ...)
2957 {
2958         va_list args, args1;
2959         struct workqueue_struct *wq;
2960         unsigned int cpu;
2961         size_t namelen;
2962
2963         /* determine namelen, allocate wq and format name */
2964         va_start(args, lock_name);
2965         va_copy(args1, args);
2966         namelen = vsnprintf(NULL, 0, fmt, args) + 1;
2967
2968         wq = kzalloc(sizeof(*wq) + namelen, GFP_KERNEL);
2969         if (!wq)
2970                 goto err;
2971
2972         vsnprintf(wq->name, namelen, fmt, args1);
2973         va_end(args);
2974         va_end(args1);
2975
2976         /*
2977          * Workqueues which may be used during memory reclaim should
2978          * have a rescuer to guarantee forward progress.
2979          */
2980         if (flags & WQ_MEM_RECLAIM)
2981                 flags |= WQ_RESCUER;
2982
2983         max_active = max_active ?: WQ_DFL_ACTIVE;
2984         max_active = wq_clamp_max_active(max_active, flags, wq->name);
2985
2986         /* init wq */
2987         wq->flags = flags;
2988         wq->saved_max_active = max_active;
2989         mutex_init(&wq->flush_mutex);
2990         atomic_set(&wq->nr_cwqs_to_flush, 0);
2991         INIT_LIST_HEAD(&wq->flusher_queue);
2992         INIT_LIST_HEAD(&wq->flusher_overflow);
2993
2994         lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
2995         INIT_LIST_HEAD(&wq->list);
2996
2997         if (alloc_cwqs(wq) < 0)
2998                 goto err;
2999
3000         for_each_cwq_cpu(cpu, wq) {
3001                 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
3002                 struct global_cwq *gcwq = get_gcwq(cpu);
3003                 int pool_idx = (bool)(flags & WQ_HIGHPRI);
3004
3005                 BUG_ON((unsigned long)cwq & WORK_STRUCT_FLAG_MASK);
3006                 cwq->pool = &gcwq->pools[pool_idx];
3007                 cwq->wq = wq;
3008                 cwq->flush_color = -1;
3009                 cwq->max_active = max_active;
3010                 INIT_LIST_HEAD(&cwq->delayed_works);
3011         }
3012
3013         if (flags & WQ_RESCUER) {
3014                 struct worker *rescuer;
3015
3016                 if (!alloc_mayday_mask(&wq->mayday_mask, GFP_KERNEL))
3017                         goto err;
3018
3019                 wq->rescuer = rescuer = alloc_worker();
3020                 if (!rescuer)
3021                         goto err;
3022
3023                 rescuer->task = kthread_create(rescuer_thread, wq, "%s",
3024                                                wq->name);
3025                 if (IS_ERR(rescuer->task))
3026                         goto err;
3027
3028                 rescuer->task->flags |= PF_THREAD_BOUND;
3029                 wake_up_process(rescuer->task);
3030         }
3031
3032         /*
3033          * workqueue_lock protects global freeze state and workqueues
3034          * list.  Grab it, set max_active accordingly and add the new
3035          * workqueue to workqueues list.
3036          */
3037         spin_lock(&workqueue_lock);
3038
3039         if (workqueue_freezing && wq->flags & WQ_FREEZABLE)
3040                 for_each_cwq_cpu(cpu, wq)
3041                         get_cwq(cpu, wq)->max_active = 0;
3042
3043         list_add(&wq->list, &workqueues);
3044
3045         spin_unlock(&workqueue_lock);
3046
3047         return wq;
3048 err:
3049         if (wq) {
3050                 free_cwqs(wq);
3051                 free_mayday_mask(wq->mayday_mask);
3052                 kfree(wq->rescuer);
3053                 kfree(wq);
3054         }
3055         return NULL;
3056 }
3057 EXPORT_SYMBOL_GPL(__alloc_workqueue_key);
3058
3059 /**
3060  * destroy_workqueue - safely terminate a workqueue
3061  * @wq: target workqueue
3062  *
3063  * Safely destroy a workqueue. All work currently pending will be done first.
3064  */
3065 void destroy_workqueue(struct workqueue_struct *wq)
3066 {
3067         unsigned int cpu;
3068
3069         /* drain it before proceeding with destruction */
3070         drain_workqueue(wq);
3071
3072         /*
3073          * wq list is used to freeze wq, remove from list after
3074          * flushing is complete in case freeze races us.
3075          */
3076         spin_lock(&workqueue_lock);
3077         list_del(&wq->list);
3078         spin_unlock(&workqueue_lock);
3079
3080         /* sanity check */
3081         for_each_cwq_cpu(cpu, wq) {
3082                 struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
3083                 int i;
3084
3085                 for (i = 0; i < WORK_NR_COLORS; i++)
3086                         BUG_ON(cwq->nr_in_flight[i]);
3087                 BUG_ON(cwq->nr_active);
3088                 BUG_ON(!list_empty(&cwq->delayed_works));
3089         }
3090
3091         if (wq->flags & WQ_RESCUER) {
3092                 kthread_stop(wq->rescuer->task);
3093                 free_mayday_mask(wq->mayday_mask);
3094                 kfree(wq->rescuer);
3095         }
3096
3097         free_cwqs(wq);
3098         kfree(wq);
3099 }
3100 EXPORT_SYMBOL_GPL(destroy_workqueue);
3101
3102 /**
3103  * workqueue_set_max_active - adjust max_active of a workqueue
3104  * @wq: target workqueue
3105  * @max_active: new max_active value.
3106  *
3107  * Set max_active of @wq to @max_active.
3108  *
3109  * CONTEXT:
3110  * Don't call from IRQ context.
3111  */
3112 void workqueue_set_max_active(struct workqueue_struct *wq, int max_active)
3113 {
3114         unsigned int cpu;
3115
3116         max_active = wq_clamp_max_active(max_active, wq->flags, wq->name);
3117
3118         spin_lock(&workqueue_lock);
3119
3120         wq->saved_max_active = max_active;
3121
3122         for_each_cwq_cpu(cpu, wq) {
3123                 struct global_cwq *gcwq = get_gcwq(cpu);
3124
3125                 spin_lock_irq(&gcwq->lock);
3126
3127                 if (!(wq->flags & WQ_FREEZABLE) ||
3128                     !(gcwq->flags & GCWQ_FREEZING))
3129                         get_cwq(gcwq->cpu, wq)->max_active = max_active;
3130
3131                 spin_unlock_irq(&gcwq->lock);
3132         }
3133
3134         spin_unlock(&workqueue_lock);
3135 }
3136 EXPORT_SYMBOL_GPL(workqueue_set_max_active);
3137
3138 /**
3139  * workqueue_congested - test whether a workqueue is congested
3140  * @cpu: CPU in question
3141  * @wq: target workqueue
3142  *
3143  * Test whether @wq's cpu workqueue for @cpu is congested.  There is
3144  * no synchronization around this function and the test result is
3145  * unreliable and only useful as advisory hints or for debugging.
3146  *
3147  * RETURNS:
3148  * %true if congested, %false otherwise.
3149  */
3150 bool workqueue_congested(unsigned int cpu, struct workqueue_struct *wq)
3151 {
3152         struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
3153
3154         return !list_empty(&cwq->delayed_works);
3155 }
3156 EXPORT_SYMBOL_GPL(workqueue_congested);
3157
3158 /**
3159  * work_cpu - return the last known associated cpu for @work
3160  * @work: the work of interest
3161  *
3162  * RETURNS:
3163  * CPU number if @work was ever queued.  WORK_CPU_NONE otherwise.
3164  */
3165 unsigned int work_cpu(struct work_struct *work)
3166 {
3167         struct global_cwq *gcwq = get_work_gcwq(work);
3168
3169         return gcwq ? gcwq->cpu : WORK_CPU_NONE;
3170 }
3171 EXPORT_SYMBOL_GPL(work_cpu);
3172
3173 /**
3174  * work_busy - test whether a work is currently pending or running
3175  * @work: the work to be tested
3176  *
3177  * Test whether @work is currently pending or running.  There is no
3178  * synchronization around this function and the test result is
3179  * unreliable and only useful as advisory hints or for debugging.
3180  * Especially for reentrant wqs, the pending state might hide the
3181  * running state.
3182  *
3183  * RETURNS:
3184  * OR'd bitmask of WORK_BUSY_* bits.
3185  */
3186 unsigned int work_busy(struct work_struct *work)
3187 {
3188         struct global_cwq *gcwq = get_work_gcwq(work);
3189         unsigned long flags;
3190         unsigned int ret = 0;
3191
3192         if (!gcwq)
3193                 return false;
3194
3195         spin_lock_irqsave(&gcwq->lock, flags);
3196
3197         if (work_pending(work))
3198                 ret |= WORK_BUSY_PENDING;
3199         if (find_worker_executing_work(gcwq, work))
3200                 ret |= WORK_BUSY_RUNNING;
3201
3202         spin_unlock_irqrestore(&gcwq->lock, flags);
3203
3204         return ret;
3205 }
3206 EXPORT_SYMBOL_GPL(work_busy);
3207
3208 /*
3209  * CPU hotplug.
3210  *
3211  * There are two challenges in supporting CPU hotplug.  Firstly, there
3212  * are a lot of assumptions on strong associations among work, cwq and
3213  * gcwq which make migrating pending and scheduled works very
3214  * difficult to implement without impacting hot paths.  Secondly,
3215  * gcwqs serve mix of short, long and very long running works making
3216  * blocked draining impractical.
3217  *
3218  * This is solved by allowing a gcwq to be detached from CPU, running
3219  * it with unbound (rogue) workers and allowing it to be reattached
3220  * later if the cpu comes back online.  A separate thread is created
3221  * to govern a gcwq in such state and is called the trustee of the
3222  * gcwq.
3223  *
3224  * Trustee states and their descriptions.
3225  *
3226  * START        Command state used on startup.  On CPU_DOWN_PREPARE, a
3227  *              new trustee is started with this state.
3228  *
3229  * IN_CHARGE    Once started, trustee will enter this state after
3230  *              assuming the manager role and making all existing
3231  *              workers rogue.  DOWN_PREPARE waits for trustee to
3232  *              enter this state.  After reaching IN_CHARGE, trustee
3233  *              tries to execute the pending worklist until it's empty
3234  *              and the state is set to BUTCHER, or the state is set
3235  *              to RELEASE.
3236  *
3237  * BUTCHER      Command state which is set by the cpu callback after
3238  *              the cpu has went down.  Once this state is set trustee
3239  *              knows that there will be no new works on the worklist
3240  *              and once the worklist is empty it can proceed to
3241  *              killing idle workers.
3242  *
3243  * RELEASE      Command state which is set by the cpu callback if the
3244  *              cpu down has been canceled or it has come online
3245  *              again.  After recognizing this state, trustee stops
3246  *              trying to drain or butcher and clears ROGUE, rebinds
3247  *              all remaining workers back to the cpu and releases
3248  *              manager role.
3249  *
3250  * DONE         Trustee will enter this state after BUTCHER or RELEASE
3251  *              is complete.
3252  *
3253  *          trustee                 CPU                draining
3254  *         took over                down               complete
3255  * START -----------> IN_CHARGE -----------> BUTCHER -----------> DONE
3256  *                        |                     |                  ^
3257  *                        | CPU is back online  v   return workers |
3258  *                         ----------------> RELEASE --------------
3259  */
3260
3261 /**
3262  * trustee_wait_event_timeout - timed event wait for trustee
3263  * @cond: condition to wait for
3264  * @timeout: timeout in jiffies
3265  *
3266  * wait_event_timeout() for trustee to use.  Handles locking and
3267  * checks for RELEASE request.
3268  *
3269  * CONTEXT:
3270  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
3271  * multiple times.  To be used by trustee.
3272  *
3273  * RETURNS:
3274  * Positive indicating left time if @cond is satisfied, 0 if timed
3275  * out, -1 if canceled.
3276  */
3277 #define trustee_wait_event_timeout(cond, timeout) ({                    \
3278         long __ret = (timeout);                                         \
3279         while (!((cond) || (gcwq->trustee_state == TRUSTEE_RELEASE)) && \
3280                __ret) {                                                 \
3281                 spin_unlock_irq(&gcwq->lock);                           \
3282                 __wait_event_timeout(gcwq->trustee_wait, (cond) ||      \
3283                         (gcwq->trustee_state == TRUSTEE_RELEASE),       \
3284                         __ret);                                         \
3285                 spin_lock_irq(&gcwq->lock);                             \
3286         }                                                               \
3287         gcwq->trustee_state == TRUSTEE_RELEASE ? -1 : (__ret);          \
3288 })
3289
3290 /**
3291  * trustee_wait_event - event wait for trustee
3292  * @cond: condition to wait for
3293  *
3294  * wait_event() for trustee to use.  Automatically handles locking and
3295  * checks for CANCEL request.
3296  *
3297  * CONTEXT:
3298  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
3299  * multiple times.  To be used by trustee.
3300  *
3301  * RETURNS:
3302  * 0 if @cond is satisfied, -1 if canceled.
3303  */
3304 #define trustee_wait_event(cond) ({                                     \
3305         long __ret1;                                                    \
3306         __ret1 = trustee_wait_event_timeout(cond, MAX_SCHEDULE_TIMEOUT);\
3307         __ret1 < 0 ? -1 : 0;                                            \
3308 })
3309
3310 static bool gcwq_is_managing_workers(struct global_cwq *gcwq)
3311 {
3312         struct worker_pool *pool;
3313
3314         for_each_worker_pool(pool, gcwq)
3315                 if (pool->flags & POOL_MANAGING_WORKERS)
3316                         return true;
3317         return false;
3318 }
3319
3320 static bool gcwq_has_idle_workers(struct global_cwq *gcwq)
3321 {
3322         struct worker_pool *pool;
3323
3324         for_each_worker_pool(pool, gcwq)
3325                 if (!list_empty(&pool->idle_list))
3326                         return true;
3327         return false;
3328 }
3329
3330 static int __cpuinit trustee_thread(void *__gcwq)
3331 {
3332         struct global_cwq *gcwq = __gcwq;
3333         struct worker_pool *pool;
3334         struct worker *worker;
3335         struct work_struct *work;
3336         struct hlist_node *pos;
3337         long rc;
3338         int i;
3339
3340         BUG_ON(gcwq->cpu != smp_processor_id());
3341
3342         spin_lock_irq(&gcwq->lock);
3343         /*
3344          * Claim the manager position and make all workers rogue.
3345          * Trustee must be bound to the target cpu and can't be
3346          * cancelled.
3347          */
3348         BUG_ON(gcwq->cpu != smp_processor_id());
3349         rc = trustee_wait_event(!gcwq_is_managing_workers(gcwq));
3350         BUG_ON(rc < 0);
3351
3352         for_each_worker_pool(pool, gcwq) {
3353                 pool->flags |= POOL_MANAGING_WORKERS;
3354
3355                 list_for_each_entry(worker, &pool->idle_list, entry)
3356                         worker->flags |= WORKER_ROGUE;
3357         }
3358
3359         for_each_busy_worker(worker, i, pos, gcwq)
3360                 worker->flags |= WORKER_ROGUE;
3361
3362         /*
3363          * Call schedule() so that we cross rq->lock and thus can
3364          * guarantee sched callbacks see the rogue flag.  This is
3365          * necessary as scheduler callbacks may be invoked from other
3366          * cpus.
3367          */
3368         spin_unlock_irq(&gcwq->lock);
3369         schedule();
3370         spin_lock_irq(&gcwq->lock);
3371
3372         /*
3373          * Sched callbacks are disabled now.  Zap nr_running.  After
3374          * this, nr_running stays zero and need_more_worker() and
3375          * keep_working() are always true as long as the worklist is
3376          * not empty.
3377          */
3378         for_each_worker_pool(pool, gcwq)
3379                 atomic_set(get_pool_nr_running(pool), 0);
3380
3381         spin_unlock_irq(&gcwq->lock);
3382         for_each_worker_pool(pool, gcwq)
3383                 del_timer_sync(&pool->idle_timer);
3384         spin_lock_irq(&gcwq->lock);
3385
3386         /*
3387          * We're now in charge.  Notify and proceed to drain.  We need
3388          * to keep the gcwq running during the whole CPU down
3389          * procedure as other cpu hotunplug callbacks may need to
3390          * flush currently running tasks.
3391          */
3392         gcwq->trustee_state = TRUSTEE_IN_CHARGE;
3393         wake_up_all(&gcwq->trustee_wait);
3394
3395         /*
3396          * The original cpu is in the process of dying and may go away
3397          * anytime now.  When that happens, we and all workers would
3398          * be migrated to other cpus.  Try draining any left work.  We
3399          * want to get it over with ASAP - spam rescuers, wake up as
3400          * many idlers as necessary and create new ones till the
3401          * worklist is empty.  Note that if the gcwq is frozen, there
3402          * may be frozen works in freezable cwqs.  Don't declare
3403          * completion while frozen.
3404          */
3405         while (true) {
3406                 bool busy = false;
3407
3408                 for_each_worker_pool(pool, gcwq)
3409                         busy |= pool->nr_workers != pool->nr_idle;
3410
3411                 if (!busy && !(gcwq->flags & GCWQ_FREEZING) &&
3412                     gcwq->trustee_state != TRUSTEE_IN_CHARGE)
3413                         break;
3414
3415                 for_each_worker_pool(pool, gcwq) {
3416                         int nr_works = 0;
3417
3418                         list_for_each_entry(work, &pool->worklist, entry) {
3419                                 send_mayday(work);
3420                                 nr_works++;
3421                         }
3422
3423                         list_for_each_entry(worker, &pool->idle_list, entry) {
3424                                 if (!nr_works--)
3425                                         break;
3426                                 wake_up_process(worker->task);
3427                         }
3428
3429                         if (need_to_create_worker(pool)) {
3430                                 spin_unlock_irq(&gcwq->lock);
3431                                 worker = create_worker(pool, false);
3432                                 spin_lock_irq(&gcwq->lock);
3433                                 if (worker) {
3434                                         worker->flags |= WORKER_ROGUE;
3435                                         start_worker(worker);
3436                                 }
3437                         }
3438                 }
3439
3440                 /* give a breather */
3441                 if (trustee_wait_event_timeout(false, TRUSTEE_COOLDOWN) < 0)
3442                         break;
3443         }
3444
3445         /*
3446          * Either all works have been scheduled and cpu is down, or
3447          * cpu down has already been canceled.  Wait for and butcher
3448          * all workers till we're canceled.
3449          */
3450         do {
3451                 rc = trustee_wait_event(gcwq_has_idle_workers(gcwq));
3452
3453                 i = 0;
3454                 for_each_worker_pool(pool, gcwq) {
3455                         while (!list_empty(&pool->idle_list)) {
3456                                 worker = list_first_entry(&pool->idle_list,
3457                                                           struct worker, entry);
3458                                 destroy_worker(worker);
3459                         }
3460                         i |= pool->nr_workers;
3461                 }
3462         } while (i && rc >= 0);
3463
3464         /*
3465          * At this point, either draining has completed and no worker
3466          * is left, or cpu down has been canceled or the cpu is being
3467          * brought back up.  There shouldn't be any idle one left.
3468          * Tell the remaining busy ones to rebind once it finishes the
3469          * currently scheduled works by scheduling the rebind_work.
3470          */
3471         for_each_worker_pool(pool, gcwq)
3472                 WARN_ON(!list_empty(&pool->idle_list));
3473
3474         for_each_busy_worker(worker, i, pos, gcwq) {
3475                 struct work_struct *rebind_work = &worker->rebind_work;
3476
3477                 /*
3478                  * Rebind_work may race with future cpu hotplug
3479                  * operations.  Use a separate flag to mark that
3480                  * rebinding is scheduled.
3481                  */
3482                 worker->flags |= WORKER_REBIND;
3483                 worker->flags &= ~WORKER_ROGUE;
3484
3485                 /* queue rebind_work, wq doesn't matter, use the default one */
3486                 if (test_and_set_bit(WORK_STRUCT_PENDING_BIT,
3487                                      work_data_bits(rebind_work)))
3488                         continue;
3489
3490                 debug_work_activate(rebind_work);
3491                 insert_work(get_cwq(gcwq->cpu, system_wq), rebind_work,
3492                             worker->scheduled.next,
3493                             work_color_to_flags(WORK_NO_COLOR));
3494         }
3495
3496         /* relinquish manager role */
3497         for_each_worker_pool(pool, gcwq)
3498                 pool->flags &= ~POOL_MANAGING_WORKERS;
3499
3500         /* notify completion */
3501         gcwq->trustee = NULL;
3502         gcwq->trustee_state = TRUSTEE_DONE;
3503         wake_up_all(&gcwq->trustee_wait);
3504         spin_unlock_irq(&gcwq->lock);
3505         return 0;
3506 }
3507
3508 /**
3509  * wait_trustee_state - wait for trustee to enter the specified state
3510  * @gcwq: gcwq the trustee of interest belongs to
3511  * @state: target state to wait for
3512  *
3513  * Wait for the trustee to reach @state.  DONE is already matched.
3514  *
3515  * CONTEXT:
3516  * spin_lock_irq(gcwq->lock) which may be released and regrabbed
3517  * multiple times.  To be used by cpu_callback.
3518  */
3519 static void __cpuinit wait_trustee_state(struct global_cwq *gcwq, int state)
3520 __releases(&gcwq->lock)
3521 __acquires(&gcwq->lock)
3522 {
3523         if (!(gcwq->trustee_state == state ||
3524               gcwq->trustee_state == TRUSTEE_DONE)) {
3525                 spin_unlock_irq(&gcwq->lock);
3526                 __wait_event(gcwq->trustee_wait,
3527                              gcwq->trustee_state == state ||
3528                              gcwq->trustee_state == TRUSTEE_DONE);
3529                 spin_lock_irq(&gcwq->lock);
3530         }
3531 }
3532
3533 static int __devinit workqueue_cpu_callback(struct notifier_block *nfb,
3534                                                 unsigned long action,
3535                                                 void *hcpu)
3536 {
3537         unsigned int cpu = (unsigned long)hcpu;
3538         struct global_cwq *gcwq = get_gcwq(cpu);
3539         struct task_struct *new_trustee = NULL;
3540         struct worker *new_workers[NR_WORKER_POOLS] = { };
3541         struct worker_pool *pool;
3542         unsigned long flags;
3543         int i;
3544
3545         action &= ~CPU_TASKS_FROZEN;
3546
3547         switch (action) {
3548         case CPU_DOWN_PREPARE:
3549                 new_trustee = kthread_create(trustee_thread, gcwq,
3550                                              "workqueue_trustee/%d\n", cpu);
3551                 if (IS_ERR(new_trustee))
3552                         return notifier_from_errno(PTR_ERR(new_trustee));
3553                 kthread_bind(new_trustee, cpu);
3554                 /* fall through */
3555         case CPU_UP_PREPARE:
3556                 i = 0;
3557                 for_each_worker_pool(pool, gcwq) {
3558                         BUG_ON(pool->first_idle);
3559                         new_workers[i] = create_worker(pool, false);
3560                         if (!new_workers[i++])
3561                                 goto err_destroy;
3562                 }
3563         }
3564
3565         /* some are called w/ irq disabled, don't disturb irq status */
3566         spin_lock_irqsave(&gcwq->lock, flags);
3567
3568         switch (action) {
3569         case CPU_DOWN_PREPARE:
3570                 /* initialize trustee and tell it to acquire the gcwq */
3571                 BUG_ON(gcwq->trustee || gcwq->trustee_state != TRUSTEE_DONE);
3572                 gcwq->trustee = new_trustee;
3573                 gcwq->trustee_state = TRUSTEE_START;
3574                 wake_up_process(gcwq->trustee);
3575                 wait_trustee_state(gcwq, TRUSTEE_IN_CHARGE);
3576                 /* fall through */
3577         case CPU_UP_PREPARE:
3578                 i = 0;
3579                 for_each_worker_pool(pool, gcwq) {
3580                         BUG_ON(pool->first_idle);
3581                         pool->first_idle = new_workers[i++];
3582                 }
3583                 break;
3584
3585         case CPU_DYING:
3586                 /*
3587                  * Before this, the trustee and all workers except for
3588                  * the ones which are still executing works from
3589                  * before the last CPU down must be on the cpu.  After
3590                  * this, they'll all be diasporas.
3591                  */
3592                 gcwq->flags |= GCWQ_DISASSOCIATED;
3593                 break;
3594
3595         case CPU_POST_DEAD:
3596                 gcwq->trustee_state = TRUSTEE_BUTCHER;
3597                 /* fall through */
3598         case CPU_UP_CANCELED:
3599                 for_each_worker_pool(pool, gcwq) {
3600                         destroy_worker(pool->first_idle);
3601                         pool->first_idle = NULL;
3602                 }
3603                 break;
3604
3605         case CPU_DOWN_FAILED:
3606         case CPU_ONLINE:
3607                 gcwq->flags &= ~GCWQ_DISASSOCIATED;
3608                 if (gcwq->trustee_state != TRUSTEE_DONE) {
3609                         gcwq->trustee_state = TRUSTEE_RELEASE;
3610                         wake_up_process(gcwq->trustee);
3611                         wait_trustee_state(gcwq, TRUSTEE_DONE);
3612                 }
3613
3614                 /*
3615                  * Trustee is done and there might be no worker left.
3616                  * Put the first_idle in and request a real manager to
3617                  * take a look.
3618                  */
3619                 for_each_worker_pool(pool, gcwq) {
3620                         spin_unlock_irq(&gcwq->lock);
3621                         kthread_bind(pool->first_idle->task, cpu);
3622                         spin_lock_irq(&gcwq->lock);
3623                         pool->flags |= POOL_MANAGE_WORKERS;
3624                         start_worker(pool->first_idle);
3625                         pool->first_idle = NULL;
3626                 }
3627                 break;
3628         }
3629
3630         spin_unlock_irqrestore(&gcwq->lock, flags);
3631
3632         return notifier_from_errno(0);
3633
3634 err_destroy:
3635         if (new_trustee)
3636                 kthread_stop(new_trustee);
3637
3638         spin_lock_irqsave(&gcwq->lock, flags);
3639         for (i = 0; i < NR_WORKER_POOLS; i++)
3640                 if (new_workers[i])
3641                         destroy_worker(new_workers[i]);
3642         spin_unlock_irqrestore(&gcwq->lock, flags);
3643
3644         return NOTIFY_BAD;
3645 }
3646
3647 #ifdef CONFIG_SMP
3648
3649 struct work_for_cpu {
3650         struct completion completion;
3651         long (*fn)(void *);
3652         void *arg;
3653         long ret;
3654 };
3655
3656 static int do_work_for_cpu(void *_wfc)
3657 {
3658         struct work_for_cpu *wfc = _wfc;
3659         wfc->ret = wfc->fn(wfc->arg);
3660         complete(&wfc->completion);
3661         return 0;
3662 }
3663
3664 /**
3665  * work_on_cpu - run a function in user context on a particular cpu
3666  * @cpu: the cpu to run on
3667  * @fn: the function to run
3668  * @arg: the function arg
3669  *
3670  * This will return the value @fn returns.
3671  * It is up to the caller to ensure that the cpu doesn't go offline.
3672  * The caller must not hold any locks which would prevent @fn from completing.
3673  */
3674 long work_on_cpu(unsigned int cpu, long (*fn)(void *), void *arg)
3675 {
3676         struct task_struct *sub_thread;
3677         struct work_for_cpu wfc = {
3678                 .completion = COMPLETION_INITIALIZER_ONSTACK(wfc.completion),
3679                 .fn = fn,
3680                 .arg = arg,
3681         };
3682
3683         sub_thread = kthread_create(do_work_for_cpu, &wfc, "work_for_cpu");
3684         if (IS_ERR(sub_thread))
3685                 return PTR_ERR(sub_thread);
3686         kthread_bind(sub_thread, cpu);
3687         wake_up_process(sub_thread);
3688         wait_for_completion(&wfc.completion);
3689         return wfc.ret;
3690 }
3691 EXPORT_SYMBOL_GPL(work_on_cpu);
3692 #endif /* CONFIG_SMP */
3693
3694 #ifdef CONFIG_FREEZER
3695
3696 /**
3697  * freeze_workqueues_begin - begin freezing workqueues
3698  *
3699  * Start freezing workqueues.  After this function returns, all freezable
3700  * workqueues will queue new works to their frozen_works list instead of
3701  * gcwq->worklist.
3702  *
3703  * CONTEXT:
3704  * Grabs and releases workqueue_lock and gcwq->lock's.
3705  */
3706 void freeze_workqueues_begin(void)
3707 {
3708         unsigned int cpu;
3709
3710         spin_lock(&workqueue_lock);
3711
3712         BUG_ON(workqueue_freezing);
3713         workqueue_freezing = true;
3714
3715         for_each_gcwq_cpu(cpu) {
3716                 struct global_cwq *gcwq = get_gcwq(cpu);
3717                 struct workqueue_struct *wq;
3718
3719                 spin_lock_irq(&gcwq->lock);
3720
3721                 BUG_ON(gcwq->flags & GCWQ_FREEZING);
3722                 gcwq->flags |= GCWQ_FREEZING;
3723
3724                 list_for_each_entry(wq, &workqueues, list) {
3725                         struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
3726
3727                         if (cwq && wq->flags & WQ_FREEZABLE)
3728                                 cwq->max_active = 0;
3729                 }
3730
3731                 spin_unlock_irq(&gcwq->lock);
3732         }
3733
3734         spin_unlock(&workqueue_lock);
3735 }
3736
3737 /**
3738  * freeze_workqueues_busy - are freezable workqueues still busy?
3739  *
3740  * Check whether freezing is complete.  This function must be called
3741  * between freeze_workqueues_begin() and thaw_workqueues().
3742  *
3743  * CONTEXT:
3744  * Grabs and releases workqueue_lock.
3745  *
3746  * RETURNS:
3747  * %true if some freezable workqueues are still busy.  %false if freezing
3748  * is complete.
3749  */
3750 bool freeze_workqueues_busy(void)
3751 {
3752         unsigned int cpu;
3753         bool busy = false;
3754
3755         spin_lock(&workqueue_lock);
3756
3757         BUG_ON(!workqueue_freezing);
3758
3759         for_each_gcwq_cpu(cpu) {
3760                 struct workqueue_struct *wq;
3761                 /*
3762                  * nr_active is monotonically decreasing.  It's safe
3763                  * to peek without lock.
3764                  */
3765                 list_for_each_entry(wq, &workqueues, list) {
3766                         struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
3767
3768                         if (!cwq || !(wq->flags & WQ_FREEZABLE))
3769                                 continue;
3770
3771                         BUG_ON(cwq->nr_active < 0);
3772                         if (cwq->nr_active) {
3773                                 busy = true;
3774                                 goto out_unlock;
3775                         }
3776                 }
3777         }
3778 out_unlock:
3779         spin_unlock(&workqueue_lock);
3780         return busy;
3781 }
3782
3783 /**
3784  * thaw_workqueues - thaw workqueues
3785  *
3786  * Thaw workqueues.  Normal queueing is restored and all collected
3787  * frozen works are transferred to their respective gcwq worklists.
3788  *
3789  * CONTEXT:
3790  * Grabs and releases workqueue_lock and gcwq->lock's.
3791  */
3792 void thaw_workqueues(void)
3793 {
3794         unsigned int cpu;
3795
3796         spin_lock(&workqueue_lock);
3797
3798         if (!workqueue_freezing)
3799                 goto out_unlock;
3800
3801         for_each_gcwq_cpu(cpu) {
3802                 struct global_cwq *gcwq = get_gcwq(cpu);
3803                 struct worker_pool *pool;
3804                 struct workqueue_struct *wq;
3805
3806                 spin_lock_irq(&gcwq->lock);
3807
3808                 BUG_ON(!(gcwq->flags & GCWQ_FREEZING));
3809                 gcwq->flags &= ~GCWQ_FREEZING;
3810
3811                 list_for_each_entry(wq, &workqueues, list) {
3812                         struct cpu_workqueue_struct *cwq = get_cwq(cpu, wq);
3813
3814                         if (!cwq || !(wq->flags & WQ_FREEZABLE))
3815                                 continue;
3816
3817                         /* restore max_active and repopulate worklist */
3818                         cwq->max_active = wq->saved_max_active;
3819
3820                         while (!list_empty(&cwq->delayed_works) &&
3821                                cwq->nr_active < cwq->max_active)
3822                                 cwq_activate_first_delayed(cwq);
3823                 }
3824
3825                 for_each_worker_pool(pool, gcwq)
3826                         wake_up_worker(pool);
3827
3828                 spin_unlock_irq(&gcwq->lock);
3829         }
3830
3831         workqueue_freezing = false;
3832 out_unlock:
3833         spin_unlock(&workqueue_lock);
3834 }
3835 #endif /* CONFIG_FREEZER */
3836
3837 static int __init init_workqueues(void)
3838 {
3839         unsigned int cpu;
3840         int i;
3841
3842         cpu_notifier(workqueue_cpu_callback, CPU_PRI_WORKQUEUE);
3843
3844         /* initialize gcwqs */
3845         for_each_gcwq_cpu(cpu) {
3846                 struct global_cwq *gcwq = get_gcwq(cpu);
3847                 struct worker_pool *pool;
3848
3849                 spin_lock_init(&gcwq->lock);
3850                 gcwq->cpu = cpu;
3851                 gcwq->flags |= GCWQ_DISASSOCIATED;
3852
3853                 for (i = 0; i < BUSY_WORKER_HASH_SIZE; i++)
3854                         INIT_HLIST_HEAD(&gcwq->busy_hash[i]);
3855
3856                 for_each_worker_pool(pool, gcwq) {
3857                         pool->gcwq = gcwq;
3858                         INIT_LIST_HEAD(&pool->worklist);
3859                         INIT_LIST_HEAD(&pool->idle_list);
3860
3861                         init_timer_deferrable(&pool->idle_timer);
3862                         pool->idle_timer.function = idle_worker_timeout;
3863                         pool->idle_timer.data = (unsigned long)pool;
3864
3865                         setup_timer(&pool->mayday_timer, gcwq_mayday_timeout,
3866                                     (unsigned long)pool);
3867
3868                         ida_init(&pool->worker_ida);
3869                 }
3870
3871                 gcwq->trustee_state = TRUSTEE_DONE;
3872                 init_waitqueue_head(&gcwq->trustee_wait);
3873         }
3874
3875         /* create the initial worker */
3876         for_each_online_gcwq_cpu(cpu) {
3877                 struct global_cwq *gcwq = get_gcwq(cpu);
3878                 struct worker_pool *pool;
3879
3880                 if (cpu != WORK_CPU_UNBOUND)
3881                         gcwq->flags &= ~GCWQ_DISASSOCIATED;
3882
3883                 for_each_worker_pool(pool, gcwq) {
3884                         struct worker *worker;
3885
3886                         worker = create_worker(pool, true);
3887                         BUG_ON(!worker);
3888                         spin_lock_irq(&gcwq->lock);
3889                         start_worker(worker);
3890                         spin_unlock_irq(&gcwq->lock);
3891                 }
3892         }
3893
3894         system_wq = alloc_workqueue("events", 0, 0);
3895         system_long_wq = alloc_workqueue("events_long", 0, 0);
3896         system_nrt_wq = alloc_workqueue("events_nrt", WQ_NON_REENTRANT, 0);
3897         system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
3898                                             WQ_UNBOUND_MAX_ACTIVE);
3899         system_freezable_wq = alloc_workqueue("events_freezable",
3900                                               WQ_FREEZABLE, 0);
3901         system_nrt_freezable_wq = alloc_workqueue("events_nrt_freezable",
3902                         WQ_NON_REENTRANT | WQ_FREEZABLE, 0);
3903         BUG_ON(!system_wq || !system_long_wq || !system_nrt_wq ||
3904                !system_unbound_wq || !system_freezable_wq ||
3905                 !system_nrt_freezable_wq);
3906         return 0;
3907 }
3908 early_initcall(init_workqueues);