workqueue: Make unbound workqueues to use per-cpu pool_workqueues
[linux-2.6-block.git] / kernel / workqueue.c
... / ...
CommitLineData
1// SPDX-License-Identifier: GPL-2.0-only
2/*
3 * kernel/workqueue.c - generic async execution with shared worker pool
4 *
5 * Copyright (C) 2002 Ingo Molnar
6 *
7 * Derived from the taskqueue/keventd code by:
8 * David Woodhouse <dwmw2@infradead.org>
9 * Andrew Morton
10 * Kai Petzke <wpp@marie.physik.tu-berlin.de>
11 * Theodore Ts'o <tytso@mit.edu>
12 *
13 * Made to use alloc_percpu by Christoph Lameter.
14 *
15 * Copyright (C) 2010 SUSE Linux Products GmbH
16 * Copyright (C) 2010 Tejun Heo <tj@kernel.org>
17 *
18 * This is the generic async execution mechanism. Work items as are
19 * executed in process context. The worker pool is shared and
20 * automatically managed. There are two worker pools for each CPU (one for
21 * normal work items and the other for high priority ones) and some extra
22 * pools for workqueues which are not bound to any specific CPU - the
23 * number of these backing pools is dynamic.
24 *
25 * Please read Documentation/core-api/workqueue.rst for details.
26 */
27
28#include <linux/export.h>
29#include <linux/kernel.h>
30#include <linux/sched.h>
31#include <linux/init.h>
32#include <linux/signal.h>
33#include <linux/completion.h>
34#include <linux/workqueue.h>
35#include <linux/slab.h>
36#include <linux/cpu.h>
37#include <linux/notifier.h>
38#include <linux/kthread.h>
39#include <linux/hardirq.h>
40#include <linux/mempolicy.h>
41#include <linux/freezer.h>
42#include <linux/debug_locks.h>
43#include <linux/lockdep.h>
44#include <linux/idr.h>
45#include <linux/jhash.h>
46#include <linux/hashtable.h>
47#include <linux/rculist.h>
48#include <linux/nodemask.h>
49#include <linux/moduleparam.h>
50#include <linux/uaccess.h>
51#include <linux/sched/isolation.h>
52#include <linux/sched/debug.h>
53#include <linux/nmi.h>
54#include <linux/kvm_para.h>
55#include <linux/delay.h>
56
57#include "workqueue_internal.h"
58
59enum {
60 /*
61 * worker_pool flags
62 *
63 * A bound pool is either associated or disassociated with its CPU.
64 * While associated (!DISASSOCIATED), all workers are bound to the
65 * CPU and none has %WORKER_UNBOUND set and concurrency management
66 * is in effect.
67 *
68 * While DISASSOCIATED, the cpu may be offline and all workers have
69 * %WORKER_UNBOUND set and concurrency management disabled, and may
70 * be executing on any CPU. The pool behaves as an unbound one.
71 *
72 * Note that DISASSOCIATED should be flipped only while holding
73 * wq_pool_attach_mutex to avoid changing binding state while
74 * worker_attach_to_pool() is in progress.
75 */
76 POOL_MANAGER_ACTIVE = 1 << 0, /* being managed */
77 POOL_DISASSOCIATED = 1 << 2, /* cpu can't serve workers */
78
79 /* worker flags */
80 WORKER_DIE = 1 << 1, /* die die die */
81 WORKER_IDLE = 1 << 2, /* is idle */
82 WORKER_PREP = 1 << 3, /* preparing to run works */
83 WORKER_CPU_INTENSIVE = 1 << 6, /* cpu intensive */
84 WORKER_UNBOUND = 1 << 7, /* worker is unbound */
85 WORKER_REBOUND = 1 << 8, /* worker was rebound */
86
87 WORKER_NOT_RUNNING = WORKER_PREP | WORKER_CPU_INTENSIVE |
88 WORKER_UNBOUND | WORKER_REBOUND,
89
90 NR_STD_WORKER_POOLS = 2, /* # standard pools per cpu */
91
92 UNBOUND_POOL_HASH_ORDER = 6, /* hashed by pool->attrs */
93 BUSY_WORKER_HASH_ORDER = 6, /* 64 pointers */
94
95 MAX_IDLE_WORKERS_RATIO = 4, /* 1/4 of busy can be idle */
96 IDLE_WORKER_TIMEOUT = 300 * HZ, /* keep idle ones for 5 mins */
97
98 MAYDAY_INITIAL_TIMEOUT = HZ / 100 >= 2 ? HZ / 100 : 2,
99 /* call for help after 10ms
100 (min two ticks) */
101 MAYDAY_INTERVAL = HZ / 10, /* and then every 100ms */
102 CREATE_COOLDOWN = HZ, /* time to breath after fail */
103
104 /*
105 * Rescue workers are used only on emergencies and shared by
106 * all cpus. Give MIN_NICE.
107 */
108 RESCUER_NICE_LEVEL = MIN_NICE,
109 HIGHPRI_NICE_LEVEL = MIN_NICE,
110
111 WQ_NAME_LEN = 24,
112};
113
114/*
115 * Structure fields follow one of the following exclusion rules.
116 *
117 * I: Modifiable by initialization/destruction paths and read-only for
118 * everyone else.
119 *
120 * P: Preemption protected. Disabling preemption is enough and should
121 * only be modified and accessed from the local cpu.
122 *
123 * L: pool->lock protected. Access with pool->lock held.
124 *
125 * K: Only modified by worker while holding pool->lock. Can be safely read by
126 * self, while holding pool->lock or from IRQ context if %current is the
127 * kworker.
128 *
129 * S: Only modified by worker self.
130 *
131 * A: wq_pool_attach_mutex protected.
132 *
133 * PL: wq_pool_mutex protected.
134 *
135 * PR: wq_pool_mutex protected for writes. RCU protected for reads.
136 *
137 * PW: wq_pool_mutex and wq->mutex protected for writes. Either for reads.
138 *
139 * PWR: wq_pool_mutex and wq->mutex protected for writes. Either or
140 * RCU for reads.
141 *
142 * WQ: wq->mutex protected.
143 *
144 * WR: wq->mutex protected for writes. RCU protected for reads.
145 *
146 * MD: wq_mayday_lock protected.
147 *
148 * WD: Used internally by the watchdog.
149 */
150
151/* struct worker is defined in workqueue_internal.h */
152
153struct worker_pool {
154 raw_spinlock_t lock; /* the pool lock */
155 int cpu; /* I: the associated cpu */
156 int node; /* I: the associated node ID */
157 int id; /* I: pool ID */
158 unsigned int flags; /* L: flags */
159
160 unsigned long watchdog_ts; /* L: watchdog timestamp */
161 bool cpu_stall; /* WD: stalled cpu bound pool */
162
163 /*
164 * The counter is incremented in a process context on the associated CPU
165 * w/ preemption disabled, and decremented or reset in the same context
166 * but w/ pool->lock held. The readers grab pool->lock and are
167 * guaranteed to see if the counter reached zero.
168 */
169 int nr_running;
170
171 struct list_head worklist; /* L: list of pending works */
172
173 int nr_workers; /* L: total number of workers */
174 int nr_idle; /* L: currently idle workers */
175
176 struct list_head idle_list; /* L: list of idle workers */
177 struct timer_list idle_timer; /* L: worker idle timeout */
178 struct work_struct idle_cull_work; /* L: worker idle cleanup */
179
180 struct timer_list mayday_timer; /* L: SOS timer for workers */
181
182 /* a workers is either on busy_hash or idle_list, or the manager */
183 DECLARE_HASHTABLE(busy_hash, BUSY_WORKER_HASH_ORDER);
184 /* L: hash of busy workers */
185
186 struct worker *manager; /* L: purely informational */
187 struct list_head workers; /* A: attached workers */
188 struct list_head dying_workers; /* A: workers about to die */
189 struct completion *detach_completion; /* all workers detached */
190
191 struct ida worker_ida; /* worker IDs for task name */
192
193 struct workqueue_attrs *attrs; /* I: worker attributes */
194 struct hlist_node hash_node; /* PL: unbound_pool_hash node */
195 int refcnt; /* PL: refcnt for unbound pools */
196
197 /*
198 * Destruction of pool is RCU protected to allow dereferences
199 * from get_work_pool().
200 */
201 struct rcu_head rcu;
202};
203
204/*
205 * Per-pool_workqueue statistics. These can be monitored using
206 * tools/workqueue/wq_monitor.py.
207 */
208enum pool_workqueue_stats {
209 PWQ_STAT_STARTED, /* work items started execution */
210 PWQ_STAT_COMPLETED, /* work items completed execution */
211 PWQ_STAT_CPU_TIME, /* total CPU time consumed */
212 PWQ_STAT_CPU_INTENSIVE, /* wq_cpu_intensive_thresh_us violations */
213 PWQ_STAT_CM_WAKEUP, /* concurrency-management worker wakeups */
214 PWQ_STAT_MAYDAY, /* maydays to rescuer */
215 PWQ_STAT_RESCUED, /* linked work items executed by rescuer */
216
217 PWQ_NR_STATS,
218};
219
220/*
221 * The per-pool workqueue. While queued, the lower WORK_STRUCT_FLAG_BITS
222 * of work_struct->data are used for flags and the remaining high bits
223 * point to the pwq; thus, pwqs need to be aligned at two's power of the
224 * number of flag bits.
225 */
226struct pool_workqueue {
227 struct worker_pool *pool; /* I: the associated pool */
228 struct workqueue_struct *wq; /* I: the owning workqueue */
229 int work_color; /* L: current color */
230 int flush_color; /* L: flushing color */
231 int refcnt; /* L: reference count */
232 int nr_in_flight[WORK_NR_COLORS];
233 /* L: nr of in_flight works */
234
235 /*
236 * nr_active management and WORK_STRUCT_INACTIVE:
237 *
238 * When pwq->nr_active >= max_active, new work item is queued to
239 * pwq->inactive_works instead of pool->worklist and marked with
240 * WORK_STRUCT_INACTIVE.
241 *
242 * All work items marked with WORK_STRUCT_INACTIVE do not participate
243 * in pwq->nr_active and all work items in pwq->inactive_works are
244 * marked with WORK_STRUCT_INACTIVE. But not all WORK_STRUCT_INACTIVE
245 * work items are in pwq->inactive_works. Some of them are ready to
246 * run in pool->worklist or worker->scheduled. Those work itmes are
247 * only struct wq_barrier which is used for flush_work() and should
248 * not participate in pwq->nr_active. For non-barrier work item, it
249 * is marked with WORK_STRUCT_INACTIVE iff it is in pwq->inactive_works.
250 */
251 int nr_active; /* L: nr of active works */
252 int max_active; /* L: max active works */
253 struct list_head inactive_works; /* L: inactive works */
254 struct list_head pwqs_node; /* WR: node on wq->pwqs */
255 struct list_head mayday_node; /* MD: node on wq->maydays */
256
257 u64 stats[PWQ_NR_STATS];
258
259 /*
260 * Release of unbound pwq is punted to a kthread_worker. See put_pwq()
261 * and pwq_release_workfn() for details. pool_workqueue itself is also
262 * RCU protected so that the first pwq can be determined without
263 * grabbing wq->mutex.
264 */
265 struct kthread_work release_work;
266 struct rcu_head rcu;
267} __aligned(1 << WORK_STRUCT_FLAG_BITS);
268
269/*
270 * Structure used to wait for workqueue flush.
271 */
272struct wq_flusher {
273 struct list_head list; /* WQ: list of flushers */
274 int flush_color; /* WQ: flush color waiting for */
275 struct completion done; /* flush completion */
276};
277
278struct wq_device;
279
280/*
281 * The externally visible workqueue. It relays the issued work items to
282 * the appropriate worker_pool through its pool_workqueues.
283 */
284struct workqueue_struct {
285 struct list_head pwqs; /* WR: all pwqs of this wq */
286 struct list_head list; /* PR: list of all workqueues */
287
288 struct mutex mutex; /* protects this wq */
289 int work_color; /* WQ: current work color */
290 int flush_color; /* WQ: current flush color */
291 atomic_t nr_pwqs_to_flush; /* flush in progress */
292 struct wq_flusher *first_flusher; /* WQ: first flusher */
293 struct list_head flusher_queue; /* WQ: flush waiters */
294 struct list_head flusher_overflow; /* WQ: flush overflow list */
295
296 struct list_head maydays; /* MD: pwqs requesting rescue */
297 struct worker *rescuer; /* MD: rescue worker */
298
299 int nr_drainers; /* WQ: drain in progress */
300 int saved_max_active; /* WQ: saved pwq max_active */
301
302 struct workqueue_attrs *unbound_attrs; /* PW: only for unbound wqs */
303 struct pool_workqueue *dfl_pwq; /* PW: only for unbound wqs */
304
305#ifdef CONFIG_SYSFS
306 struct wq_device *wq_dev; /* I: for sysfs interface */
307#endif
308#ifdef CONFIG_LOCKDEP
309 char *lock_name;
310 struct lock_class_key key;
311 struct lockdep_map lockdep_map;
312#endif
313 char name[WQ_NAME_LEN]; /* I: workqueue name */
314
315 /*
316 * Destruction of workqueue_struct is RCU protected to allow walking
317 * the workqueues list without grabbing wq_pool_mutex.
318 * This is used to dump all workqueues from sysrq.
319 */
320 struct rcu_head rcu;
321
322 /* hot fields used during command issue, aligned to cacheline */
323 unsigned int flags ____cacheline_aligned; /* WQ: WQ_* flags */
324 struct pool_workqueue __percpu __rcu **cpu_pwq; /* I: per-cpu pwqs */
325};
326
327static struct kmem_cache *pwq_cache;
328
329static cpumask_var_t *wq_numa_possible_cpumask;
330 /* possible CPUs of each node */
331
332/*
333 * Per-cpu work items which run for longer than the following threshold are
334 * automatically considered CPU intensive and excluded from concurrency
335 * management to prevent them from noticeably delaying other per-cpu work items.
336 * ULONG_MAX indicates that the user hasn't overridden it with a boot parameter.
337 * The actual value is initialized in wq_cpu_intensive_thresh_init().
338 */
339static unsigned long wq_cpu_intensive_thresh_us = ULONG_MAX;
340module_param_named(cpu_intensive_thresh_us, wq_cpu_intensive_thresh_us, ulong, 0644);
341
342/* see the comment above the definition of WQ_POWER_EFFICIENT */
343static bool wq_power_efficient = IS_ENABLED(CONFIG_WQ_POWER_EFFICIENT_DEFAULT);
344module_param_named(power_efficient, wq_power_efficient, bool, 0444);
345
346static bool wq_online; /* can kworkers be created yet? */
347
348static bool wq_numa_enabled; /* unbound NUMA affinity enabled */
349
350/* buf for wq_update_unbound_numa_attrs(), protected by CPU hotplug exclusion */
351static struct workqueue_attrs *wq_update_unbound_numa_attrs_buf;
352
353static DEFINE_MUTEX(wq_pool_mutex); /* protects pools and workqueues list */
354static DEFINE_MUTEX(wq_pool_attach_mutex); /* protects worker attach/detach */
355static DEFINE_RAW_SPINLOCK(wq_mayday_lock); /* protects wq->maydays list */
356/* wait for manager to go away */
357static struct rcuwait manager_wait = __RCUWAIT_INITIALIZER(manager_wait);
358
359static LIST_HEAD(workqueues); /* PR: list of all workqueues */
360static bool workqueue_freezing; /* PL: have wqs started freezing? */
361
362/* PL&A: allowable cpus for unbound wqs and work items */
363static cpumask_var_t wq_unbound_cpumask;
364
365/* for further constrain wq_unbound_cpumask by cmdline parameter*/
366static struct cpumask wq_cmdline_cpumask __initdata;
367
368/* CPU where unbound work was last round robin scheduled from this CPU */
369static DEFINE_PER_CPU(int, wq_rr_cpu_last);
370
371/*
372 * Local execution of unbound work items is no longer guaranteed. The
373 * following always forces round-robin CPU selection on unbound work items
374 * to uncover usages which depend on it.
375 */
376#ifdef CONFIG_DEBUG_WQ_FORCE_RR_CPU
377static bool wq_debug_force_rr_cpu = true;
378#else
379static bool wq_debug_force_rr_cpu = false;
380#endif
381module_param_named(debug_force_rr_cpu, wq_debug_force_rr_cpu, bool, 0644);
382
383/* the per-cpu worker pools */
384static DEFINE_PER_CPU_SHARED_ALIGNED(struct worker_pool [NR_STD_WORKER_POOLS], cpu_worker_pools);
385
386static DEFINE_IDR(worker_pool_idr); /* PR: idr of all pools */
387
388/* PL: hash of all unbound pools keyed by pool->attrs */
389static DEFINE_HASHTABLE(unbound_pool_hash, UNBOUND_POOL_HASH_ORDER);
390
391/* I: attributes used when instantiating standard unbound pools on demand */
392static struct workqueue_attrs *unbound_std_wq_attrs[NR_STD_WORKER_POOLS];
393
394/* I: attributes used when instantiating ordered pools on demand */
395static struct workqueue_attrs *ordered_wq_attrs[NR_STD_WORKER_POOLS];
396
397/*
398 * I: kthread_worker to release pwq's. pwq release needs to be bounced to a
399 * process context while holding a pool lock. Bounce to a dedicated kthread
400 * worker to avoid A-A deadlocks.
401 */
402static struct kthread_worker *pwq_release_worker;
403
404struct workqueue_struct *system_wq __read_mostly;
405EXPORT_SYMBOL(system_wq);
406struct workqueue_struct *system_highpri_wq __read_mostly;
407EXPORT_SYMBOL_GPL(system_highpri_wq);
408struct workqueue_struct *system_long_wq __read_mostly;
409EXPORT_SYMBOL_GPL(system_long_wq);
410struct workqueue_struct *system_unbound_wq __read_mostly;
411EXPORT_SYMBOL_GPL(system_unbound_wq);
412struct workqueue_struct *system_freezable_wq __read_mostly;
413EXPORT_SYMBOL_GPL(system_freezable_wq);
414struct workqueue_struct *system_power_efficient_wq __read_mostly;
415EXPORT_SYMBOL_GPL(system_power_efficient_wq);
416struct workqueue_struct *system_freezable_power_efficient_wq __read_mostly;
417EXPORT_SYMBOL_GPL(system_freezable_power_efficient_wq);
418
419static int worker_thread(void *__worker);
420static void workqueue_sysfs_unregister(struct workqueue_struct *wq);
421static void show_pwq(struct pool_workqueue *pwq);
422static void show_one_worker_pool(struct worker_pool *pool);
423
424#define CREATE_TRACE_POINTS
425#include <trace/events/workqueue.h>
426
427#define assert_rcu_or_pool_mutex() \
428 RCU_LOCKDEP_WARN(!rcu_read_lock_held() && \
429 !lockdep_is_held(&wq_pool_mutex), \
430 "RCU or wq_pool_mutex should be held")
431
432#define assert_rcu_or_wq_mutex_or_pool_mutex(wq) \
433 RCU_LOCKDEP_WARN(!rcu_read_lock_held() && \
434 !lockdep_is_held(&wq->mutex) && \
435 !lockdep_is_held(&wq_pool_mutex), \
436 "RCU, wq->mutex or wq_pool_mutex should be held")
437
438#define for_each_cpu_worker_pool(pool, cpu) \
439 for ((pool) = &per_cpu(cpu_worker_pools, cpu)[0]; \
440 (pool) < &per_cpu(cpu_worker_pools, cpu)[NR_STD_WORKER_POOLS]; \
441 (pool)++)
442
443/**
444 * for_each_pool - iterate through all worker_pools in the system
445 * @pool: iteration cursor
446 * @pi: integer used for iteration
447 *
448 * This must be called either with wq_pool_mutex held or RCU read
449 * locked. If the pool needs to be used beyond the locking in effect, the
450 * caller is responsible for guaranteeing that the pool stays online.
451 *
452 * The if/else clause exists only for the lockdep assertion and can be
453 * ignored.
454 */
455#define for_each_pool(pool, pi) \
456 idr_for_each_entry(&worker_pool_idr, pool, pi) \
457 if (({ assert_rcu_or_pool_mutex(); false; })) { } \
458 else
459
460/**
461 * for_each_pool_worker - iterate through all workers of a worker_pool
462 * @worker: iteration cursor
463 * @pool: worker_pool to iterate workers of
464 *
465 * This must be called with wq_pool_attach_mutex.
466 *
467 * The if/else clause exists only for the lockdep assertion and can be
468 * ignored.
469 */
470#define for_each_pool_worker(worker, pool) \
471 list_for_each_entry((worker), &(pool)->workers, node) \
472 if (({ lockdep_assert_held(&wq_pool_attach_mutex); false; })) { } \
473 else
474
475/**
476 * for_each_pwq - iterate through all pool_workqueues of the specified workqueue
477 * @pwq: iteration cursor
478 * @wq: the target workqueue
479 *
480 * This must be called either with wq->mutex held or RCU read locked.
481 * If the pwq needs to be used beyond the locking in effect, the caller is
482 * responsible for guaranteeing that the pwq stays online.
483 *
484 * The if/else clause exists only for the lockdep assertion and can be
485 * ignored.
486 */
487#define for_each_pwq(pwq, wq) \
488 list_for_each_entry_rcu((pwq), &(wq)->pwqs, pwqs_node, \
489 lockdep_is_held(&(wq->mutex)))
490
491#ifdef CONFIG_DEBUG_OBJECTS_WORK
492
493static const struct debug_obj_descr work_debug_descr;
494
495static void *work_debug_hint(void *addr)
496{
497 return ((struct work_struct *) addr)->func;
498}
499
500static bool work_is_static_object(void *addr)
501{
502 struct work_struct *work = addr;
503
504 return test_bit(WORK_STRUCT_STATIC_BIT, work_data_bits(work));
505}
506
507/*
508 * fixup_init is called when:
509 * - an active object is initialized
510 */
511static bool work_fixup_init(void *addr, enum debug_obj_state state)
512{
513 struct work_struct *work = addr;
514
515 switch (state) {
516 case ODEBUG_STATE_ACTIVE:
517 cancel_work_sync(work);
518 debug_object_init(work, &work_debug_descr);
519 return true;
520 default:
521 return false;
522 }
523}
524
525/*
526 * fixup_free is called when:
527 * - an active object is freed
528 */
529static bool work_fixup_free(void *addr, enum debug_obj_state state)
530{
531 struct work_struct *work = addr;
532
533 switch (state) {
534 case ODEBUG_STATE_ACTIVE:
535 cancel_work_sync(work);
536 debug_object_free(work, &work_debug_descr);
537 return true;
538 default:
539 return false;
540 }
541}
542
543static const struct debug_obj_descr work_debug_descr = {
544 .name = "work_struct",
545 .debug_hint = work_debug_hint,
546 .is_static_object = work_is_static_object,
547 .fixup_init = work_fixup_init,
548 .fixup_free = work_fixup_free,
549};
550
551static inline void debug_work_activate(struct work_struct *work)
552{
553 debug_object_activate(work, &work_debug_descr);
554}
555
556static inline void debug_work_deactivate(struct work_struct *work)
557{
558 debug_object_deactivate(work, &work_debug_descr);
559}
560
561void __init_work(struct work_struct *work, int onstack)
562{
563 if (onstack)
564 debug_object_init_on_stack(work, &work_debug_descr);
565 else
566 debug_object_init(work, &work_debug_descr);
567}
568EXPORT_SYMBOL_GPL(__init_work);
569
570void destroy_work_on_stack(struct work_struct *work)
571{
572 debug_object_free(work, &work_debug_descr);
573}
574EXPORT_SYMBOL_GPL(destroy_work_on_stack);
575
576void destroy_delayed_work_on_stack(struct delayed_work *work)
577{
578 destroy_timer_on_stack(&work->timer);
579 debug_object_free(&work->work, &work_debug_descr);
580}
581EXPORT_SYMBOL_GPL(destroy_delayed_work_on_stack);
582
583#else
584static inline void debug_work_activate(struct work_struct *work) { }
585static inline void debug_work_deactivate(struct work_struct *work) { }
586#endif
587
588/**
589 * worker_pool_assign_id - allocate ID and assign it to @pool
590 * @pool: the pool pointer of interest
591 *
592 * Returns 0 if ID in [0, WORK_OFFQ_POOL_NONE) is allocated and assigned
593 * successfully, -errno on failure.
594 */
595static int worker_pool_assign_id(struct worker_pool *pool)
596{
597 int ret;
598
599 lockdep_assert_held(&wq_pool_mutex);
600
601 ret = idr_alloc(&worker_pool_idr, pool, 0, WORK_OFFQ_POOL_NONE,
602 GFP_KERNEL);
603 if (ret >= 0) {
604 pool->id = ret;
605 return 0;
606 }
607 return ret;
608}
609
610static unsigned int work_color_to_flags(int color)
611{
612 return color << WORK_STRUCT_COLOR_SHIFT;
613}
614
615static int get_work_color(unsigned long work_data)
616{
617 return (work_data >> WORK_STRUCT_COLOR_SHIFT) &
618 ((1 << WORK_STRUCT_COLOR_BITS) - 1);
619}
620
621static int work_next_color(int color)
622{
623 return (color + 1) % WORK_NR_COLORS;
624}
625
626/*
627 * While queued, %WORK_STRUCT_PWQ is set and non flag bits of a work's data
628 * contain the pointer to the queued pwq. Once execution starts, the flag
629 * is cleared and the high bits contain OFFQ flags and pool ID.
630 *
631 * set_work_pwq(), set_work_pool_and_clear_pending(), mark_work_canceling()
632 * and clear_work_data() can be used to set the pwq, pool or clear
633 * work->data. These functions should only be called while the work is
634 * owned - ie. while the PENDING bit is set.
635 *
636 * get_work_pool() and get_work_pwq() can be used to obtain the pool or pwq
637 * corresponding to a work. Pool is available once the work has been
638 * queued anywhere after initialization until it is sync canceled. pwq is
639 * available only while the work item is queued.
640 *
641 * %WORK_OFFQ_CANCELING is used to mark a work item which is being
642 * canceled. While being canceled, a work item may have its PENDING set
643 * but stay off timer and worklist for arbitrarily long and nobody should
644 * try to steal the PENDING bit.
645 */
646static inline void set_work_data(struct work_struct *work, unsigned long data,
647 unsigned long flags)
648{
649 WARN_ON_ONCE(!work_pending(work));
650 atomic_long_set(&work->data, data | flags | work_static(work));
651}
652
653static void set_work_pwq(struct work_struct *work, struct pool_workqueue *pwq,
654 unsigned long extra_flags)
655{
656 set_work_data(work, (unsigned long)pwq,
657 WORK_STRUCT_PENDING | WORK_STRUCT_PWQ | extra_flags);
658}
659
660static void set_work_pool_and_keep_pending(struct work_struct *work,
661 int pool_id)
662{
663 set_work_data(work, (unsigned long)pool_id << WORK_OFFQ_POOL_SHIFT,
664 WORK_STRUCT_PENDING);
665}
666
667static void set_work_pool_and_clear_pending(struct work_struct *work,
668 int pool_id)
669{
670 /*
671 * The following wmb is paired with the implied mb in
672 * test_and_set_bit(PENDING) and ensures all updates to @work made
673 * here are visible to and precede any updates by the next PENDING
674 * owner.
675 */
676 smp_wmb();
677 set_work_data(work, (unsigned long)pool_id << WORK_OFFQ_POOL_SHIFT, 0);
678 /*
679 * The following mb guarantees that previous clear of a PENDING bit
680 * will not be reordered with any speculative LOADS or STORES from
681 * work->current_func, which is executed afterwards. This possible
682 * reordering can lead to a missed execution on attempt to queue
683 * the same @work. E.g. consider this case:
684 *
685 * CPU#0 CPU#1
686 * ---------------------------- --------------------------------
687 *
688 * 1 STORE event_indicated
689 * 2 queue_work_on() {
690 * 3 test_and_set_bit(PENDING)
691 * 4 } set_..._and_clear_pending() {
692 * 5 set_work_data() # clear bit
693 * 6 smp_mb()
694 * 7 work->current_func() {
695 * 8 LOAD event_indicated
696 * }
697 *
698 * Without an explicit full barrier speculative LOAD on line 8 can
699 * be executed before CPU#0 does STORE on line 1. If that happens,
700 * CPU#0 observes the PENDING bit is still set and new execution of
701 * a @work is not queued in a hope, that CPU#1 will eventually
702 * finish the queued @work. Meanwhile CPU#1 does not see
703 * event_indicated is set, because speculative LOAD was executed
704 * before actual STORE.
705 */
706 smp_mb();
707}
708
709static void clear_work_data(struct work_struct *work)
710{
711 smp_wmb(); /* see set_work_pool_and_clear_pending() */
712 set_work_data(work, WORK_STRUCT_NO_POOL, 0);
713}
714
715static inline struct pool_workqueue *work_struct_pwq(unsigned long data)
716{
717 return (struct pool_workqueue *)(data & WORK_STRUCT_WQ_DATA_MASK);
718}
719
720static struct pool_workqueue *get_work_pwq(struct work_struct *work)
721{
722 unsigned long data = atomic_long_read(&work->data);
723
724 if (data & WORK_STRUCT_PWQ)
725 return work_struct_pwq(data);
726 else
727 return NULL;
728}
729
730/**
731 * get_work_pool - return the worker_pool a given work was associated with
732 * @work: the work item of interest
733 *
734 * Pools are created and destroyed under wq_pool_mutex, and allows read
735 * access under RCU read lock. As such, this function should be
736 * called under wq_pool_mutex or inside of a rcu_read_lock() region.
737 *
738 * All fields of the returned pool are accessible as long as the above
739 * mentioned locking is in effect. If the returned pool needs to be used
740 * beyond the critical section, the caller is responsible for ensuring the
741 * returned pool is and stays online.
742 *
743 * Return: The worker_pool @work was last associated with. %NULL if none.
744 */
745static struct worker_pool *get_work_pool(struct work_struct *work)
746{
747 unsigned long data = atomic_long_read(&work->data);
748 int pool_id;
749
750 assert_rcu_or_pool_mutex();
751
752 if (data & WORK_STRUCT_PWQ)
753 return work_struct_pwq(data)->pool;
754
755 pool_id = data >> WORK_OFFQ_POOL_SHIFT;
756 if (pool_id == WORK_OFFQ_POOL_NONE)
757 return NULL;
758
759 return idr_find(&worker_pool_idr, pool_id);
760}
761
762/**
763 * get_work_pool_id - return the worker pool ID a given work is associated with
764 * @work: the work item of interest
765 *
766 * Return: The worker_pool ID @work was last associated with.
767 * %WORK_OFFQ_POOL_NONE if none.
768 */
769static int get_work_pool_id(struct work_struct *work)
770{
771 unsigned long data = atomic_long_read(&work->data);
772
773 if (data & WORK_STRUCT_PWQ)
774 return work_struct_pwq(data)->pool->id;
775
776 return data >> WORK_OFFQ_POOL_SHIFT;
777}
778
779static void mark_work_canceling(struct work_struct *work)
780{
781 unsigned long pool_id = get_work_pool_id(work);
782
783 pool_id <<= WORK_OFFQ_POOL_SHIFT;
784 set_work_data(work, pool_id | WORK_OFFQ_CANCELING, WORK_STRUCT_PENDING);
785}
786
787static bool work_is_canceling(struct work_struct *work)
788{
789 unsigned long data = atomic_long_read(&work->data);
790
791 return !(data & WORK_STRUCT_PWQ) && (data & WORK_OFFQ_CANCELING);
792}
793
794/*
795 * Policy functions. These define the policies on how the global worker
796 * pools are managed. Unless noted otherwise, these functions assume that
797 * they're being called with pool->lock held.
798 */
799
800static bool __need_more_worker(struct worker_pool *pool)
801{
802 return !pool->nr_running;
803}
804
805/*
806 * Need to wake up a worker? Called from anything but currently
807 * running workers.
808 *
809 * Note that, because unbound workers never contribute to nr_running, this
810 * function will always return %true for unbound pools as long as the
811 * worklist isn't empty.
812 */
813static bool need_more_worker(struct worker_pool *pool)
814{
815 return !list_empty(&pool->worklist) && __need_more_worker(pool);
816}
817
818/* Can I start working? Called from busy but !running workers. */
819static bool may_start_working(struct worker_pool *pool)
820{
821 return pool->nr_idle;
822}
823
824/* Do I need to keep working? Called from currently running workers. */
825static bool keep_working(struct worker_pool *pool)
826{
827 return !list_empty(&pool->worklist) && (pool->nr_running <= 1);
828}
829
830/* Do we need a new worker? Called from manager. */
831static bool need_to_create_worker(struct worker_pool *pool)
832{
833 return need_more_worker(pool) && !may_start_working(pool);
834}
835
836/* Do we have too many workers and should some go away? */
837static bool too_many_workers(struct worker_pool *pool)
838{
839 bool managing = pool->flags & POOL_MANAGER_ACTIVE;
840 int nr_idle = pool->nr_idle + managing; /* manager is considered idle */
841 int nr_busy = pool->nr_workers - nr_idle;
842
843 return nr_idle > 2 && (nr_idle - 2) * MAX_IDLE_WORKERS_RATIO >= nr_busy;
844}
845
846/**
847 * worker_set_flags - set worker flags and adjust nr_running accordingly
848 * @worker: self
849 * @flags: flags to set
850 *
851 * Set @flags in @worker->flags and adjust nr_running accordingly.
852 */
853static inline void worker_set_flags(struct worker *worker, unsigned int flags)
854{
855 struct worker_pool *pool = worker->pool;
856
857 lockdep_assert_held(&pool->lock);
858
859 /* If transitioning into NOT_RUNNING, adjust nr_running. */
860 if ((flags & WORKER_NOT_RUNNING) &&
861 !(worker->flags & WORKER_NOT_RUNNING)) {
862 pool->nr_running--;
863 }
864
865 worker->flags |= flags;
866}
867
868/**
869 * worker_clr_flags - clear worker flags and adjust nr_running accordingly
870 * @worker: self
871 * @flags: flags to clear
872 *
873 * Clear @flags in @worker->flags and adjust nr_running accordingly.
874 */
875static inline void worker_clr_flags(struct worker *worker, unsigned int flags)
876{
877 struct worker_pool *pool = worker->pool;
878 unsigned int oflags = worker->flags;
879
880 lockdep_assert_held(&pool->lock);
881
882 worker->flags &= ~flags;
883
884 /*
885 * If transitioning out of NOT_RUNNING, increment nr_running. Note
886 * that the nested NOT_RUNNING is not a noop. NOT_RUNNING is mask
887 * of multiple flags, not a single flag.
888 */
889 if ((flags & WORKER_NOT_RUNNING) && (oflags & WORKER_NOT_RUNNING))
890 if (!(worker->flags & WORKER_NOT_RUNNING))
891 pool->nr_running++;
892}
893
894/* Return the first idle worker. Called with pool->lock held. */
895static struct worker *first_idle_worker(struct worker_pool *pool)
896{
897 if (unlikely(list_empty(&pool->idle_list)))
898 return NULL;
899
900 return list_first_entry(&pool->idle_list, struct worker, entry);
901}
902
903/**
904 * worker_enter_idle - enter idle state
905 * @worker: worker which is entering idle state
906 *
907 * @worker is entering idle state. Update stats and idle timer if
908 * necessary.
909 *
910 * LOCKING:
911 * raw_spin_lock_irq(pool->lock).
912 */
913static void worker_enter_idle(struct worker *worker)
914{
915 struct worker_pool *pool = worker->pool;
916
917 if (WARN_ON_ONCE(worker->flags & WORKER_IDLE) ||
918 WARN_ON_ONCE(!list_empty(&worker->entry) &&
919 (worker->hentry.next || worker->hentry.pprev)))
920 return;
921
922 /* can't use worker_set_flags(), also called from create_worker() */
923 worker->flags |= WORKER_IDLE;
924 pool->nr_idle++;
925 worker->last_active = jiffies;
926
927 /* idle_list is LIFO */
928 list_add(&worker->entry, &pool->idle_list);
929
930 if (too_many_workers(pool) && !timer_pending(&pool->idle_timer))
931 mod_timer(&pool->idle_timer, jiffies + IDLE_WORKER_TIMEOUT);
932
933 /* Sanity check nr_running. */
934 WARN_ON_ONCE(pool->nr_workers == pool->nr_idle && pool->nr_running);
935}
936
937/**
938 * worker_leave_idle - leave idle state
939 * @worker: worker which is leaving idle state
940 *
941 * @worker is leaving idle state. Update stats.
942 *
943 * LOCKING:
944 * raw_spin_lock_irq(pool->lock).
945 */
946static void worker_leave_idle(struct worker *worker)
947{
948 struct worker_pool *pool = worker->pool;
949
950 if (WARN_ON_ONCE(!(worker->flags & WORKER_IDLE)))
951 return;
952 worker_clr_flags(worker, WORKER_IDLE);
953 pool->nr_idle--;
954 list_del_init(&worker->entry);
955}
956
957/**
958 * find_worker_executing_work - find worker which is executing a work
959 * @pool: pool of interest
960 * @work: work to find worker for
961 *
962 * Find a worker which is executing @work on @pool by searching
963 * @pool->busy_hash which is keyed by the address of @work. For a worker
964 * to match, its current execution should match the address of @work and
965 * its work function. This is to avoid unwanted dependency between
966 * unrelated work executions through a work item being recycled while still
967 * being executed.
968 *
969 * This is a bit tricky. A work item may be freed once its execution
970 * starts and nothing prevents the freed area from being recycled for
971 * another work item. If the same work item address ends up being reused
972 * before the original execution finishes, workqueue will identify the
973 * recycled work item as currently executing and make it wait until the
974 * current execution finishes, introducing an unwanted dependency.
975 *
976 * This function checks the work item address and work function to avoid
977 * false positives. Note that this isn't complete as one may construct a
978 * work function which can introduce dependency onto itself through a
979 * recycled work item. Well, if somebody wants to shoot oneself in the
980 * foot that badly, there's only so much we can do, and if such deadlock
981 * actually occurs, it should be easy to locate the culprit work function.
982 *
983 * CONTEXT:
984 * raw_spin_lock_irq(pool->lock).
985 *
986 * Return:
987 * Pointer to worker which is executing @work if found, %NULL
988 * otherwise.
989 */
990static struct worker *find_worker_executing_work(struct worker_pool *pool,
991 struct work_struct *work)
992{
993 struct worker *worker;
994
995 hash_for_each_possible(pool->busy_hash, worker, hentry,
996 (unsigned long)work)
997 if (worker->current_work == work &&
998 worker->current_func == work->func)
999 return worker;
1000
1001 return NULL;
1002}
1003
1004/**
1005 * move_linked_works - move linked works to a list
1006 * @work: start of series of works to be scheduled
1007 * @head: target list to append @work to
1008 * @nextp: out parameter for nested worklist walking
1009 *
1010 * Schedule linked works starting from @work to @head. Work series to
1011 * be scheduled starts at @work and includes any consecutive work with
1012 * WORK_STRUCT_LINKED set in its predecessor.
1013 *
1014 * If @nextp is not NULL, it's updated to point to the next work of
1015 * the last scheduled work. This allows move_linked_works() to be
1016 * nested inside outer list_for_each_entry_safe().
1017 *
1018 * CONTEXT:
1019 * raw_spin_lock_irq(pool->lock).
1020 */
1021static void move_linked_works(struct work_struct *work, struct list_head *head,
1022 struct work_struct **nextp)
1023{
1024 struct work_struct *n;
1025
1026 /*
1027 * Linked worklist will always end before the end of the list,
1028 * use NULL for list head.
1029 */
1030 list_for_each_entry_safe_from(work, n, NULL, entry) {
1031 list_move_tail(&work->entry, head);
1032 if (!(*work_data_bits(work) & WORK_STRUCT_LINKED))
1033 break;
1034 }
1035
1036 /*
1037 * If we're already inside safe list traversal and have moved
1038 * multiple works to the scheduled queue, the next position
1039 * needs to be updated.
1040 */
1041 if (nextp)
1042 *nextp = n;
1043}
1044
1045/**
1046 * wake_up_worker - wake up an idle worker
1047 * @pool: worker pool to wake worker from
1048 *
1049 * Wake up the first idle worker of @pool.
1050 *
1051 * CONTEXT:
1052 * raw_spin_lock_irq(pool->lock).
1053 */
1054static void wake_up_worker(struct worker_pool *pool)
1055{
1056 struct worker *worker = first_idle_worker(pool);
1057
1058 if (likely(worker))
1059 wake_up_process(worker->task);
1060}
1061
1062#ifdef CONFIG_WQ_CPU_INTENSIVE_REPORT
1063
1064/*
1065 * Concurrency-managed per-cpu work items that hog CPU for longer than
1066 * wq_cpu_intensive_thresh_us trigger the automatic CPU_INTENSIVE mechanism,
1067 * which prevents them from stalling other concurrency-managed work items. If a
1068 * work function keeps triggering this mechanism, it's likely that the work item
1069 * should be using an unbound workqueue instead.
1070 *
1071 * wq_cpu_intensive_report() tracks work functions which trigger such conditions
1072 * and report them so that they can be examined and converted to use unbound
1073 * workqueues as appropriate. To avoid flooding the console, each violating work
1074 * function is tracked and reported with exponential backoff.
1075 */
1076#define WCI_MAX_ENTS 128
1077
1078struct wci_ent {
1079 work_func_t func;
1080 atomic64_t cnt;
1081 struct hlist_node hash_node;
1082};
1083
1084static struct wci_ent wci_ents[WCI_MAX_ENTS];
1085static int wci_nr_ents;
1086static DEFINE_RAW_SPINLOCK(wci_lock);
1087static DEFINE_HASHTABLE(wci_hash, ilog2(WCI_MAX_ENTS));
1088
1089static struct wci_ent *wci_find_ent(work_func_t func)
1090{
1091 struct wci_ent *ent;
1092
1093 hash_for_each_possible_rcu(wci_hash, ent, hash_node,
1094 (unsigned long)func) {
1095 if (ent->func == func)
1096 return ent;
1097 }
1098 return NULL;
1099}
1100
1101static void wq_cpu_intensive_report(work_func_t func)
1102{
1103 struct wci_ent *ent;
1104
1105restart:
1106 ent = wci_find_ent(func);
1107 if (ent) {
1108 u64 cnt;
1109
1110 /*
1111 * Start reporting from the fourth time and back off
1112 * exponentially.
1113 */
1114 cnt = atomic64_inc_return_relaxed(&ent->cnt);
1115 if (cnt >= 4 && is_power_of_2(cnt))
1116 printk_deferred(KERN_WARNING "workqueue: %ps hogged CPU for >%luus %llu times, consider switching to WQ_UNBOUND\n",
1117 ent->func, wq_cpu_intensive_thresh_us,
1118 atomic64_read(&ent->cnt));
1119 return;
1120 }
1121
1122 /*
1123 * @func is a new violation. Allocate a new entry for it. If wcn_ents[]
1124 * is exhausted, something went really wrong and we probably made enough
1125 * noise already.
1126 */
1127 if (wci_nr_ents >= WCI_MAX_ENTS)
1128 return;
1129
1130 raw_spin_lock(&wci_lock);
1131
1132 if (wci_nr_ents >= WCI_MAX_ENTS) {
1133 raw_spin_unlock(&wci_lock);
1134 return;
1135 }
1136
1137 if (wci_find_ent(func)) {
1138 raw_spin_unlock(&wci_lock);
1139 goto restart;
1140 }
1141
1142 ent = &wci_ents[wci_nr_ents++];
1143 ent->func = func;
1144 atomic64_set(&ent->cnt, 1);
1145 hash_add_rcu(wci_hash, &ent->hash_node, (unsigned long)func);
1146
1147 raw_spin_unlock(&wci_lock);
1148}
1149
1150#else /* CONFIG_WQ_CPU_INTENSIVE_REPORT */
1151static void wq_cpu_intensive_report(work_func_t func) {}
1152#endif /* CONFIG_WQ_CPU_INTENSIVE_REPORT */
1153
1154/**
1155 * wq_worker_running - a worker is running again
1156 * @task: task waking up
1157 *
1158 * This function is called when a worker returns from schedule()
1159 */
1160void wq_worker_running(struct task_struct *task)
1161{
1162 struct worker *worker = kthread_data(task);
1163
1164 if (!READ_ONCE(worker->sleeping))
1165 return;
1166
1167 /*
1168 * If preempted by unbind_workers() between the WORKER_NOT_RUNNING check
1169 * and the nr_running increment below, we may ruin the nr_running reset
1170 * and leave with an unexpected pool->nr_running == 1 on the newly unbound
1171 * pool. Protect against such race.
1172 */
1173 preempt_disable();
1174 if (!(worker->flags & WORKER_NOT_RUNNING))
1175 worker->pool->nr_running++;
1176 preempt_enable();
1177
1178 /*
1179 * CPU intensive auto-detection cares about how long a work item hogged
1180 * CPU without sleeping. Reset the starting timestamp on wakeup.
1181 */
1182 worker->current_at = worker->task->se.sum_exec_runtime;
1183
1184 WRITE_ONCE(worker->sleeping, 0);
1185}
1186
1187/**
1188 * wq_worker_sleeping - a worker is going to sleep
1189 * @task: task going to sleep
1190 *
1191 * This function is called from schedule() when a busy worker is
1192 * going to sleep.
1193 */
1194void wq_worker_sleeping(struct task_struct *task)
1195{
1196 struct worker *worker = kthread_data(task);
1197 struct worker_pool *pool;
1198
1199 /*
1200 * Rescuers, which may not have all the fields set up like normal
1201 * workers, also reach here, let's not access anything before
1202 * checking NOT_RUNNING.
1203 */
1204 if (worker->flags & WORKER_NOT_RUNNING)
1205 return;
1206
1207 pool = worker->pool;
1208
1209 /* Return if preempted before wq_worker_running() was reached */
1210 if (READ_ONCE(worker->sleeping))
1211 return;
1212
1213 WRITE_ONCE(worker->sleeping, 1);
1214 raw_spin_lock_irq(&pool->lock);
1215
1216 /*
1217 * Recheck in case unbind_workers() preempted us. We don't
1218 * want to decrement nr_running after the worker is unbound
1219 * and nr_running has been reset.
1220 */
1221 if (worker->flags & WORKER_NOT_RUNNING) {
1222 raw_spin_unlock_irq(&pool->lock);
1223 return;
1224 }
1225
1226 pool->nr_running--;
1227 if (need_more_worker(pool)) {
1228 worker->current_pwq->stats[PWQ_STAT_CM_WAKEUP]++;
1229 wake_up_worker(pool);
1230 }
1231 raw_spin_unlock_irq(&pool->lock);
1232}
1233
1234/**
1235 * wq_worker_tick - a scheduler tick occurred while a kworker is running
1236 * @task: task currently running
1237 *
1238 * Called from scheduler_tick(). We're in the IRQ context and the current
1239 * worker's fields which follow the 'K' locking rule can be accessed safely.
1240 */
1241void wq_worker_tick(struct task_struct *task)
1242{
1243 struct worker *worker = kthread_data(task);
1244 struct pool_workqueue *pwq = worker->current_pwq;
1245 struct worker_pool *pool = worker->pool;
1246
1247 if (!pwq)
1248 return;
1249
1250 pwq->stats[PWQ_STAT_CPU_TIME] += TICK_USEC;
1251
1252 if (!wq_cpu_intensive_thresh_us)
1253 return;
1254
1255 /*
1256 * If the current worker is concurrency managed and hogged the CPU for
1257 * longer than wq_cpu_intensive_thresh_us, it's automatically marked
1258 * CPU_INTENSIVE to avoid stalling other concurrency-managed work items.
1259 *
1260 * Set @worker->sleeping means that @worker is in the process of
1261 * switching out voluntarily and won't be contributing to
1262 * @pool->nr_running until it wakes up. As wq_worker_sleeping() also
1263 * decrements ->nr_running, setting CPU_INTENSIVE here can lead to
1264 * double decrements. The task is releasing the CPU anyway. Let's skip.
1265 * We probably want to make this prettier in the future.
1266 */
1267 if ((worker->flags & WORKER_NOT_RUNNING) || READ_ONCE(worker->sleeping) ||
1268 worker->task->se.sum_exec_runtime - worker->current_at <
1269 wq_cpu_intensive_thresh_us * NSEC_PER_USEC)
1270 return;
1271
1272 raw_spin_lock(&pool->lock);
1273
1274 worker_set_flags(worker, WORKER_CPU_INTENSIVE);
1275 wq_cpu_intensive_report(worker->current_func);
1276 pwq->stats[PWQ_STAT_CPU_INTENSIVE]++;
1277
1278 if (need_more_worker(pool)) {
1279 pwq->stats[PWQ_STAT_CM_WAKEUP]++;
1280 wake_up_worker(pool);
1281 }
1282
1283 raw_spin_unlock(&pool->lock);
1284}
1285
1286/**
1287 * wq_worker_last_func - retrieve worker's last work function
1288 * @task: Task to retrieve last work function of.
1289 *
1290 * Determine the last function a worker executed. This is called from
1291 * the scheduler to get a worker's last known identity.
1292 *
1293 * CONTEXT:
1294 * raw_spin_lock_irq(rq->lock)
1295 *
1296 * This function is called during schedule() when a kworker is going
1297 * to sleep. It's used by psi to identify aggregation workers during
1298 * dequeuing, to allow periodic aggregation to shut-off when that
1299 * worker is the last task in the system or cgroup to go to sleep.
1300 *
1301 * As this function doesn't involve any workqueue-related locking, it
1302 * only returns stable values when called from inside the scheduler's
1303 * queuing and dequeuing paths, when @task, which must be a kworker,
1304 * is guaranteed to not be processing any works.
1305 *
1306 * Return:
1307 * The last work function %current executed as a worker, NULL if it
1308 * hasn't executed any work yet.
1309 */
1310work_func_t wq_worker_last_func(struct task_struct *task)
1311{
1312 struct worker *worker = kthread_data(task);
1313
1314 return worker->last_func;
1315}
1316
1317/**
1318 * get_pwq - get an extra reference on the specified pool_workqueue
1319 * @pwq: pool_workqueue to get
1320 *
1321 * Obtain an extra reference on @pwq. The caller should guarantee that
1322 * @pwq has positive refcnt and be holding the matching pool->lock.
1323 */
1324static void get_pwq(struct pool_workqueue *pwq)
1325{
1326 lockdep_assert_held(&pwq->pool->lock);
1327 WARN_ON_ONCE(pwq->refcnt <= 0);
1328 pwq->refcnt++;
1329}
1330
1331/**
1332 * put_pwq - put a pool_workqueue reference
1333 * @pwq: pool_workqueue to put
1334 *
1335 * Drop a reference of @pwq. If its refcnt reaches zero, schedule its
1336 * destruction. The caller should be holding the matching pool->lock.
1337 */
1338static void put_pwq(struct pool_workqueue *pwq)
1339{
1340 lockdep_assert_held(&pwq->pool->lock);
1341 if (likely(--pwq->refcnt))
1342 return;
1343 /*
1344 * @pwq can't be released under pool->lock, bounce to a dedicated
1345 * kthread_worker to avoid A-A deadlocks.
1346 */
1347 kthread_queue_work(pwq_release_worker, &pwq->release_work);
1348}
1349
1350/**
1351 * put_pwq_unlocked - put_pwq() with surrounding pool lock/unlock
1352 * @pwq: pool_workqueue to put (can be %NULL)
1353 *
1354 * put_pwq() with locking. This function also allows %NULL @pwq.
1355 */
1356static void put_pwq_unlocked(struct pool_workqueue *pwq)
1357{
1358 if (pwq) {
1359 /*
1360 * As both pwqs and pools are RCU protected, the
1361 * following lock operations are safe.
1362 */
1363 raw_spin_lock_irq(&pwq->pool->lock);
1364 put_pwq(pwq);
1365 raw_spin_unlock_irq(&pwq->pool->lock);
1366 }
1367}
1368
1369static void pwq_activate_inactive_work(struct work_struct *work)
1370{
1371 struct pool_workqueue *pwq = get_work_pwq(work);
1372
1373 trace_workqueue_activate_work(work);
1374 if (list_empty(&pwq->pool->worklist))
1375 pwq->pool->watchdog_ts = jiffies;
1376 move_linked_works(work, &pwq->pool->worklist, NULL);
1377 __clear_bit(WORK_STRUCT_INACTIVE_BIT, work_data_bits(work));
1378 pwq->nr_active++;
1379}
1380
1381static void pwq_activate_first_inactive(struct pool_workqueue *pwq)
1382{
1383 struct work_struct *work = list_first_entry(&pwq->inactive_works,
1384 struct work_struct, entry);
1385
1386 pwq_activate_inactive_work(work);
1387}
1388
1389/**
1390 * pwq_dec_nr_in_flight - decrement pwq's nr_in_flight
1391 * @pwq: pwq of interest
1392 * @work_data: work_data of work which left the queue
1393 *
1394 * A work either has completed or is removed from pending queue,
1395 * decrement nr_in_flight of its pwq and handle workqueue flushing.
1396 *
1397 * CONTEXT:
1398 * raw_spin_lock_irq(pool->lock).
1399 */
1400static void pwq_dec_nr_in_flight(struct pool_workqueue *pwq, unsigned long work_data)
1401{
1402 int color = get_work_color(work_data);
1403
1404 if (!(work_data & WORK_STRUCT_INACTIVE)) {
1405 pwq->nr_active--;
1406 if (!list_empty(&pwq->inactive_works)) {
1407 /* one down, submit an inactive one */
1408 if (pwq->nr_active < pwq->max_active)
1409 pwq_activate_first_inactive(pwq);
1410 }
1411 }
1412
1413 pwq->nr_in_flight[color]--;
1414
1415 /* is flush in progress and are we at the flushing tip? */
1416 if (likely(pwq->flush_color != color))
1417 goto out_put;
1418
1419 /* are there still in-flight works? */
1420 if (pwq->nr_in_flight[color])
1421 goto out_put;
1422
1423 /* this pwq is done, clear flush_color */
1424 pwq->flush_color = -1;
1425
1426 /*
1427 * If this was the last pwq, wake up the first flusher. It
1428 * will handle the rest.
1429 */
1430 if (atomic_dec_and_test(&pwq->wq->nr_pwqs_to_flush))
1431 complete(&pwq->wq->first_flusher->done);
1432out_put:
1433 put_pwq(pwq);
1434}
1435
1436/**
1437 * try_to_grab_pending - steal work item from worklist and disable irq
1438 * @work: work item to steal
1439 * @is_dwork: @work is a delayed_work
1440 * @flags: place to store irq state
1441 *
1442 * Try to grab PENDING bit of @work. This function can handle @work in any
1443 * stable state - idle, on timer or on worklist.
1444 *
1445 * Return:
1446 *
1447 * ======== ================================================================
1448 * 1 if @work was pending and we successfully stole PENDING
1449 * 0 if @work was idle and we claimed PENDING
1450 * -EAGAIN if PENDING couldn't be grabbed at the moment, safe to busy-retry
1451 * -ENOENT if someone else is canceling @work, this state may persist
1452 * for arbitrarily long
1453 * ======== ================================================================
1454 *
1455 * Note:
1456 * On >= 0 return, the caller owns @work's PENDING bit. To avoid getting
1457 * interrupted while holding PENDING and @work off queue, irq must be
1458 * disabled on entry. This, combined with delayed_work->timer being
1459 * irqsafe, ensures that we return -EAGAIN for finite short period of time.
1460 *
1461 * On successful return, >= 0, irq is disabled and the caller is
1462 * responsible for releasing it using local_irq_restore(*@flags).
1463 *
1464 * This function is safe to call from any context including IRQ handler.
1465 */
1466static int try_to_grab_pending(struct work_struct *work, bool is_dwork,
1467 unsigned long *flags)
1468{
1469 struct worker_pool *pool;
1470 struct pool_workqueue *pwq;
1471
1472 local_irq_save(*flags);
1473
1474 /* try to steal the timer if it exists */
1475 if (is_dwork) {
1476 struct delayed_work *dwork = to_delayed_work(work);
1477
1478 /*
1479 * dwork->timer is irqsafe. If del_timer() fails, it's
1480 * guaranteed that the timer is not queued anywhere and not
1481 * running on the local CPU.
1482 */
1483 if (likely(del_timer(&dwork->timer)))
1484 return 1;
1485 }
1486
1487 /* try to claim PENDING the normal way */
1488 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work)))
1489 return 0;
1490
1491 rcu_read_lock();
1492 /*
1493 * The queueing is in progress, or it is already queued. Try to
1494 * steal it from ->worklist without clearing WORK_STRUCT_PENDING.
1495 */
1496 pool = get_work_pool(work);
1497 if (!pool)
1498 goto fail;
1499
1500 raw_spin_lock(&pool->lock);
1501 /*
1502 * work->data is guaranteed to point to pwq only while the work
1503 * item is queued on pwq->wq, and both updating work->data to point
1504 * to pwq on queueing and to pool on dequeueing are done under
1505 * pwq->pool->lock. This in turn guarantees that, if work->data
1506 * points to pwq which is associated with a locked pool, the work
1507 * item is currently queued on that pool.
1508 */
1509 pwq = get_work_pwq(work);
1510 if (pwq && pwq->pool == pool) {
1511 debug_work_deactivate(work);
1512
1513 /*
1514 * A cancelable inactive work item must be in the
1515 * pwq->inactive_works since a queued barrier can't be
1516 * canceled (see the comments in insert_wq_barrier()).
1517 *
1518 * An inactive work item cannot be grabbed directly because
1519 * it might have linked barrier work items which, if left
1520 * on the inactive_works list, will confuse pwq->nr_active
1521 * management later on and cause stall. Make sure the work
1522 * item is activated before grabbing.
1523 */
1524 if (*work_data_bits(work) & WORK_STRUCT_INACTIVE)
1525 pwq_activate_inactive_work(work);
1526
1527 list_del_init(&work->entry);
1528 pwq_dec_nr_in_flight(pwq, *work_data_bits(work));
1529
1530 /* work->data points to pwq iff queued, point to pool */
1531 set_work_pool_and_keep_pending(work, pool->id);
1532
1533 raw_spin_unlock(&pool->lock);
1534 rcu_read_unlock();
1535 return 1;
1536 }
1537 raw_spin_unlock(&pool->lock);
1538fail:
1539 rcu_read_unlock();
1540 local_irq_restore(*flags);
1541 if (work_is_canceling(work))
1542 return -ENOENT;
1543 cpu_relax();
1544 return -EAGAIN;
1545}
1546
1547/**
1548 * insert_work - insert a work into a pool
1549 * @pwq: pwq @work belongs to
1550 * @work: work to insert
1551 * @head: insertion point
1552 * @extra_flags: extra WORK_STRUCT_* flags to set
1553 *
1554 * Insert @work which belongs to @pwq after @head. @extra_flags is or'd to
1555 * work_struct flags.
1556 *
1557 * CONTEXT:
1558 * raw_spin_lock_irq(pool->lock).
1559 */
1560static void insert_work(struct pool_workqueue *pwq, struct work_struct *work,
1561 struct list_head *head, unsigned int extra_flags)
1562{
1563 debug_work_activate(work);
1564
1565 /* record the work call stack in order to print it in KASAN reports */
1566 kasan_record_aux_stack_noalloc(work);
1567
1568 /* we own @work, set data and link */
1569 set_work_pwq(work, pwq, extra_flags);
1570 list_add_tail(&work->entry, head);
1571 get_pwq(pwq);
1572}
1573
1574/*
1575 * Test whether @work is being queued from another work executing on the
1576 * same workqueue.
1577 */
1578static bool is_chained_work(struct workqueue_struct *wq)
1579{
1580 struct worker *worker;
1581
1582 worker = current_wq_worker();
1583 /*
1584 * Return %true iff I'm a worker executing a work item on @wq. If
1585 * I'm @worker, it's safe to dereference it without locking.
1586 */
1587 return worker && worker->current_pwq->wq == wq;
1588}
1589
1590/*
1591 * When queueing an unbound work item to a wq, prefer local CPU if allowed
1592 * by wq_unbound_cpumask. Otherwise, round robin among the allowed ones to
1593 * avoid perturbing sensitive tasks.
1594 */
1595static int wq_select_unbound_cpu(int cpu)
1596{
1597 int new_cpu;
1598
1599 if (likely(!wq_debug_force_rr_cpu)) {
1600 if (cpumask_test_cpu(cpu, wq_unbound_cpumask))
1601 return cpu;
1602 } else {
1603 pr_warn_once("workqueue: round-robin CPU selection forced, expect performance impact\n");
1604 }
1605
1606 if (cpumask_empty(wq_unbound_cpumask))
1607 return cpu;
1608
1609 new_cpu = __this_cpu_read(wq_rr_cpu_last);
1610 new_cpu = cpumask_next_and(new_cpu, wq_unbound_cpumask, cpu_online_mask);
1611 if (unlikely(new_cpu >= nr_cpu_ids)) {
1612 new_cpu = cpumask_first_and(wq_unbound_cpumask, cpu_online_mask);
1613 if (unlikely(new_cpu >= nr_cpu_ids))
1614 return cpu;
1615 }
1616 __this_cpu_write(wq_rr_cpu_last, new_cpu);
1617
1618 return new_cpu;
1619}
1620
1621static void __queue_work(int cpu, struct workqueue_struct *wq,
1622 struct work_struct *work)
1623{
1624 struct pool_workqueue *pwq;
1625 struct worker_pool *last_pool, *pool;
1626 unsigned int work_flags;
1627 unsigned int req_cpu = cpu;
1628
1629 /*
1630 * While a work item is PENDING && off queue, a task trying to
1631 * steal the PENDING will busy-loop waiting for it to either get
1632 * queued or lose PENDING. Grabbing PENDING and queueing should
1633 * happen with IRQ disabled.
1634 */
1635 lockdep_assert_irqs_disabled();
1636
1637
1638 /*
1639 * For a draining wq, only works from the same workqueue are
1640 * allowed. The __WQ_DESTROYING helps to spot the issue that
1641 * queues a new work item to a wq after destroy_workqueue(wq).
1642 */
1643 if (unlikely(wq->flags & (__WQ_DESTROYING | __WQ_DRAINING) &&
1644 WARN_ON_ONCE(!is_chained_work(wq))))
1645 return;
1646 rcu_read_lock();
1647retry:
1648 /* pwq which will be used unless @work is executing elsewhere */
1649 if (req_cpu == WORK_CPU_UNBOUND) {
1650 if (wq->flags & WQ_UNBOUND)
1651 cpu = wq_select_unbound_cpu(raw_smp_processor_id());
1652 else
1653 cpu = raw_smp_processor_id();
1654 }
1655
1656 pwq = rcu_dereference(*per_cpu_ptr(wq->cpu_pwq, cpu));
1657 pool = pwq->pool;
1658
1659 /*
1660 * If @work was previously on a different pool, it might still be
1661 * running there, in which case the work needs to be queued on that
1662 * pool to guarantee non-reentrancy.
1663 */
1664 last_pool = get_work_pool(work);
1665 if (last_pool && last_pool != pool) {
1666 struct worker *worker;
1667
1668 raw_spin_lock(&last_pool->lock);
1669
1670 worker = find_worker_executing_work(last_pool, work);
1671
1672 if (worker && worker->current_pwq->wq == wq) {
1673 pwq = worker->current_pwq;
1674 pool = pwq->pool;
1675 WARN_ON_ONCE(pool != last_pool);
1676 } else {
1677 /* meh... not running there, queue here */
1678 raw_spin_unlock(&last_pool->lock);
1679 raw_spin_lock(&pool->lock);
1680 }
1681 } else {
1682 raw_spin_lock(&pool->lock);
1683 }
1684
1685 /*
1686 * pwq is determined and locked. For unbound pools, we could have raced
1687 * with pwq release and it could already be dead. If its refcnt is zero,
1688 * repeat pwq selection. Note that unbound pwqs never die without
1689 * another pwq replacing it in cpu_pwq or while work items are executing
1690 * on it, so the retrying is guaranteed to make forward-progress.
1691 */
1692 if (unlikely(!pwq->refcnt)) {
1693 if (wq->flags & WQ_UNBOUND) {
1694 raw_spin_unlock(&pool->lock);
1695 cpu_relax();
1696 goto retry;
1697 }
1698 /* oops */
1699 WARN_ONCE(true, "workqueue: per-cpu pwq for %s on cpu%d has 0 refcnt",
1700 wq->name, cpu);
1701 }
1702
1703 /* pwq determined, queue */
1704 trace_workqueue_queue_work(req_cpu, pwq, work);
1705
1706 if (WARN_ON(!list_empty(&work->entry)))
1707 goto out;
1708
1709 pwq->nr_in_flight[pwq->work_color]++;
1710 work_flags = work_color_to_flags(pwq->work_color);
1711
1712 if (likely(pwq->nr_active < pwq->max_active)) {
1713 if (list_empty(&pool->worklist))
1714 pool->watchdog_ts = jiffies;
1715
1716 trace_workqueue_activate_work(work);
1717 pwq->nr_active++;
1718 insert_work(pwq, work, &pool->worklist, work_flags);
1719
1720 if (__need_more_worker(pool))
1721 wake_up_worker(pool);
1722 } else {
1723 work_flags |= WORK_STRUCT_INACTIVE;
1724 insert_work(pwq, work, &pwq->inactive_works, work_flags);
1725 }
1726
1727out:
1728 raw_spin_unlock(&pool->lock);
1729 rcu_read_unlock();
1730}
1731
1732/**
1733 * queue_work_on - queue work on specific cpu
1734 * @cpu: CPU number to execute work on
1735 * @wq: workqueue to use
1736 * @work: work to queue
1737 *
1738 * We queue the work to a specific CPU, the caller must ensure it
1739 * can't go away. Callers that fail to ensure that the specified
1740 * CPU cannot go away will execute on a randomly chosen CPU.
1741 * But note well that callers specifying a CPU that never has been
1742 * online will get a splat.
1743 *
1744 * Return: %false if @work was already on a queue, %true otherwise.
1745 */
1746bool queue_work_on(int cpu, struct workqueue_struct *wq,
1747 struct work_struct *work)
1748{
1749 bool ret = false;
1750 unsigned long flags;
1751
1752 local_irq_save(flags);
1753
1754 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
1755 __queue_work(cpu, wq, work);
1756 ret = true;
1757 }
1758
1759 local_irq_restore(flags);
1760 return ret;
1761}
1762EXPORT_SYMBOL(queue_work_on);
1763
1764/**
1765 * workqueue_select_cpu_near - Select a CPU based on NUMA node
1766 * @node: NUMA node ID that we want to select a CPU from
1767 *
1768 * This function will attempt to find a "random" cpu available on a given
1769 * node. If there are no CPUs available on the given node it will return
1770 * WORK_CPU_UNBOUND indicating that we should just schedule to any
1771 * available CPU if we need to schedule this work.
1772 */
1773static int workqueue_select_cpu_near(int node)
1774{
1775 int cpu;
1776
1777 /* No point in doing this if NUMA isn't enabled for workqueues */
1778 if (!wq_numa_enabled)
1779 return WORK_CPU_UNBOUND;
1780
1781 /* Delay binding to CPU if node is not valid or online */
1782 if (node < 0 || node >= MAX_NUMNODES || !node_online(node))
1783 return WORK_CPU_UNBOUND;
1784
1785 /* Use local node/cpu if we are already there */
1786 cpu = raw_smp_processor_id();
1787 if (node == cpu_to_node(cpu))
1788 return cpu;
1789
1790 /* Use "random" otherwise know as "first" online CPU of node */
1791 cpu = cpumask_any_and(cpumask_of_node(node), cpu_online_mask);
1792
1793 /* If CPU is valid return that, otherwise just defer */
1794 return cpu < nr_cpu_ids ? cpu : WORK_CPU_UNBOUND;
1795}
1796
1797/**
1798 * queue_work_node - queue work on a "random" cpu for a given NUMA node
1799 * @node: NUMA node that we are targeting the work for
1800 * @wq: workqueue to use
1801 * @work: work to queue
1802 *
1803 * We queue the work to a "random" CPU within a given NUMA node. The basic
1804 * idea here is to provide a way to somehow associate work with a given
1805 * NUMA node.
1806 *
1807 * This function will only make a best effort attempt at getting this onto
1808 * the right NUMA node. If no node is requested or the requested node is
1809 * offline then we just fall back to standard queue_work behavior.
1810 *
1811 * Currently the "random" CPU ends up being the first available CPU in the
1812 * intersection of cpu_online_mask and the cpumask of the node, unless we
1813 * are running on the node. In that case we just use the current CPU.
1814 *
1815 * Return: %false if @work was already on a queue, %true otherwise.
1816 */
1817bool queue_work_node(int node, struct workqueue_struct *wq,
1818 struct work_struct *work)
1819{
1820 unsigned long flags;
1821 bool ret = false;
1822
1823 /*
1824 * This current implementation is specific to unbound workqueues.
1825 * Specifically we only return the first available CPU for a given
1826 * node instead of cycling through individual CPUs within the node.
1827 *
1828 * If this is used with a per-cpu workqueue then the logic in
1829 * workqueue_select_cpu_near would need to be updated to allow for
1830 * some round robin type logic.
1831 */
1832 WARN_ON_ONCE(!(wq->flags & WQ_UNBOUND));
1833
1834 local_irq_save(flags);
1835
1836 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
1837 int cpu = workqueue_select_cpu_near(node);
1838
1839 __queue_work(cpu, wq, work);
1840 ret = true;
1841 }
1842
1843 local_irq_restore(flags);
1844 return ret;
1845}
1846EXPORT_SYMBOL_GPL(queue_work_node);
1847
1848void delayed_work_timer_fn(struct timer_list *t)
1849{
1850 struct delayed_work *dwork = from_timer(dwork, t, timer);
1851
1852 /* should have been called from irqsafe timer with irq already off */
1853 __queue_work(dwork->cpu, dwork->wq, &dwork->work);
1854}
1855EXPORT_SYMBOL(delayed_work_timer_fn);
1856
1857static void __queue_delayed_work(int cpu, struct workqueue_struct *wq,
1858 struct delayed_work *dwork, unsigned long delay)
1859{
1860 struct timer_list *timer = &dwork->timer;
1861 struct work_struct *work = &dwork->work;
1862
1863 WARN_ON_ONCE(!wq);
1864 WARN_ON_ONCE(timer->function != delayed_work_timer_fn);
1865 WARN_ON_ONCE(timer_pending(timer));
1866 WARN_ON_ONCE(!list_empty(&work->entry));
1867
1868 /*
1869 * If @delay is 0, queue @dwork->work immediately. This is for
1870 * both optimization and correctness. The earliest @timer can
1871 * expire is on the closest next tick and delayed_work users depend
1872 * on that there's no such delay when @delay is 0.
1873 */
1874 if (!delay) {
1875 __queue_work(cpu, wq, &dwork->work);
1876 return;
1877 }
1878
1879 dwork->wq = wq;
1880 dwork->cpu = cpu;
1881 timer->expires = jiffies + delay;
1882
1883 if (unlikely(cpu != WORK_CPU_UNBOUND))
1884 add_timer_on(timer, cpu);
1885 else
1886 add_timer(timer);
1887}
1888
1889/**
1890 * queue_delayed_work_on - queue work on specific CPU after delay
1891 * @cpu: CPU number to execute work on
1892 * @wq: workqueue to use
1893 * @dwork: work to queue
1894 * @delay: number of jiffies to wait before queueing
1895 *
1896 * Return: %false if @work was already on a queue, %true otherwise. If
1897 * @delay is zero and @dwork is idle, it will be scheduled for immediate
1898 * execution.
1899 */
1900bool queue_delayed_work_on(int cpu, struct workqueue_struct *wq,
1901 struct delayed_work *dwork, unsigned long delay)
1902{
1903 struct work_struct *work = &dwork->work;
1904 bool ret = false;
1905 unsigned long flags;
1906
1907 /* read the comment in __queue_work() */
1908 local_irq_save(flags);
1909
1910 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
1911 __queue_delayed_work(cpu, wq, dwork, delay);
1912 ret = true;
1913 }
1914
1915 local_irq_restore(flags);
1916 return ret;
1917}
1918EXPORT_SYMBOL(queue_delayed_work_on);
1919
1920/**
1921 * mod_delayed_work_on - modify delay of or queue a delayed work on specific CPU
1922 * @cpu: CPU number to execute work on
1923 * @wq: workqueue to use
1924 * @dwork: work to queue
1925 * @delay: number of jiffies to wait before queueing
1926 *
1927 * If @dwork is idle, equivalent to queue_delayed_work_on(); otherwise,
1928 * modify @dwork's timer so that it expires after @delay. If @delay is
1929 * zero, @work is guaranteed to be scheduled immediately regardless of its
1930 * current state.
1931 *
1932 * Return: %false if @dwork was idle and queued, %true if @dwork was
1933 * pending and its timer was modified.
1934 *
1935 * This function is safe to call from any context including IRQ handler.
1936 * See try_to_grab_pending() for details.
1937 */
1938bool mod_delayed_work_on(int cpu, struct workqueue_struct *wq,
1939 struct delayed_work *dwork, unsigned long delay)
1940{
1941 unsigned long flags;
1942 int ret;
1943
1944 do {
1945 ret = try_to_grab_pending(&dwork->work, true, &flags);
1946 } while (unlikely(ret == -EAGAIN));
1947
1948 if (likely(ret >= 0)) {
1949 __queue_delayed_work(cpu, wq, dwork, delay);
1950 local_irq_restore(flags);
1951 }
1952
1953 /* -ENOENT from try_to_grab_pending() becomes %true */
1954 return ret;
1955}
1956EXPORT_SYMBOL_GPL(mod_delayed_work_on);
1957
1958static void rcu_work_rcufn(struct rcu_head *rcu)
1959{
1960 struct rcu_work *rwork = container_of(rcu, struct rcu_work, rcu);
1961
1962 /* read the comment in __queue_work() */
1963 local_irq_disable();
1964 __queue_work(WORK_CPU_UNBOUND, rwork->wq, &rwork->work);
1965 local_irq_enable();
1966}
1967
1968/**
1969 * queue_rcu_work - queue work after a RCU grace period
1970 * @wq: workqueue to use
1971 * @rwork: work to queue
1972 *
1973 * Return: %false if @rwork was already pending, %true otherwise. Note
1974 * that a full RCU grace period is guaranteed only after a %true return.
1975 * While @rwork is guaranteed to be executed after a %false return, the
1976 * execution may happen before a full RCU grace period has passed.
1977 */
1978bool queue_rcu_work(struct workqueue_struct *wq, struct rcu_work *rwork)
1979{
1980 struct work_struct *work = &rwork->work;
1981
1982 if (!test_and_set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(work))) {
1983 rwork->wq = wq;
1984 call_rcu_hurry(&rwork->rcu, rcu_work_rcufn);
1985 return true;
1986 }
1987
1988 return false;
1989}
1990EXPORT_SYMBOL(queue_rcu_work);
1991
1992static struct worker *alloc_worker(int node)
1993{
1994 struct worker *worker;
1995
1996 worker = kzalloc_node(sizeof(*worker), GFP_KERNEL, node);
1997 if (worker) {
1998 INIT_LIST_HEAD(&worker->entry);
1999 INIT_LIST_HEAD(&worker->scheduled);
2000 INIT_LIST_HEAD(&worker->node);
2001 /* on creation a worker is in !idle && prep state */
2002 worker->flags = WORKER_PREP;
2003 }
2004 return worker;
2005}
2006
2007/**
2008 * worker_attach_to_pool() - attach a worker to a pool
2009 * @worker: worker to be attached
2010 * @pool: the target pool
2011 *
2012 * Attach @worker to @pool. Once attached, the %WORKER_UNBOUND flag and
2013 * cpu-binding of @worker are kept coordinated with the pool across
2014 * cpu-[un]hotplugs.
2015 */
2016static void worker_attach_to_pool(struct worker *worker,
2017 struct worker_pool *pool)
2018{
2019 mutex_lock(&wq_pool_attach_mutex);
2020
2021 /*
2022 * The wq_pool_attach_mutex ensures %POOL_DISASSOCIATED remains
2023 * stable across this function. See the comments above the flag
2024 * definition for details.
2025 */
2026 if (pool->flags & POOL_DISASSOCIATED)
2027 worker->flags |= WORKER_UNBOUND;
2028 else
2029 kthread_set_per_cpu(worker->task, pool->cpu);
2030
2031 if (worker->rescue_wq)
2032 set_cpus_allowed_ptr(worker->task, pool->attrs->cpumask);
2033
2034 list_add_tail(&worker->node, &pool->workers);
2035 worker->pool = pool;
2036
2037 mutex_unlock(&wq_pool_attach_mutex);
2038}
2039
2040/**
2041 * worker_detach_from_pool() - detach a worker from its pool
2042 * @worker: worker which is attached to its pool
2043 *
2044 * Undo the attaching which had been done in worker_attach_to_pool(). The
2045 * caller worker shouldn't access to the pool after detached except it has
2046 * other reference to the pool.
2047 */
2048static void worker_detach_from_pool(struct worker *worker)
2049{
2050 struct worker_pool *pool = worker->pool;
2051 struct completion *detach_completion = NULL;
2052
2053 mutex_lock(&wq_pool_attach_mutex);
2054
2055 kthread_set_per_cpu(worker->task, -1);
2056 list_del(&worker->node);
2057 worker->pool = NULL;
2058
2059 if (list_empty(&pool->workers) && list_empty(&pool->dying_workers))
2060 detach_completion = pool->detach_completion;
2061 mutex_unlock(&wq_pool_attach_mutex);
2062
2063 /* clear leftover flags without pool->lock after it is detached */
2064 worker->flags &= ~(WORKER_UNBOUND | WORKER_REBOUND);
2065
2066 if (detach_completion)
2067 complete(detach_completion);
2068}
2069
2070/**
2071 * create_worker - create a new workqueue worker
2072 * @pool: pool the new worker will belong to
2073 *
2074 * Create and start a new worker which is attached to @pool.
2075 *
2076 * CONTEXT:
2077 * Might sleep. Does GFP_KERNEL allocations.
2078 *
2079 * Return:
2080 * Pointer to the newly created worker.
2081 */
2082static struct worker *create_worker(struct worker_pool *pool)
2083{
2084 struct worker *worker;
2085 int id;
2086 char id_buf[16];
2087
2088 /* ID is needed to determine kthread name */
2089 id = ida_alloc(&pool->worker_ida, GFP_KERNEL);
2090 if (id < 0) {
2091 pr_err_once("workqueue: Failed to allocate a worker ID: %pe\n",
2092 ERR_PTR(id));
2093 return NULL;
2094 }
2095
2096 worker = alloc_worker(pool->node);
2097 if (!worker) {
2098 pr_err_once("workqueue: Failed to allocate a worker\n");
2099 goto fail;
2100 }
2101
2102 worker->id = id;
2103
2104 if (pool->cpu >= 0)
2105 snprintf(id_buf, sizeof(id_buf), "%d:%d%s", pool->cpu, id,
2106 pool->attrs->nice < 0 ? "H" : "");
2107 else
2108 snprintf(id_buf, sizeof(id_buf), "u%d:%d", pool->id, id);
2109
2110 worker->task = kthread_create_on_node(worker_thread, worker, pool->node,
2111 "kworker/%s", id_buf);
2112 if (IS_ERR(worker->task)) {
2113 if (PTR_ERR(worker->task) == -EINTR) {
2114 pr_err("workqueue: Interrupted when creating a worker thread \"kworker/%s\"\n",
2115 id_buf);
2116 } else {
2117 pr_err_once("workqueue: Failed to create a worker thread: %pe",
2118 worker->task);
2119 }
2120 goto fail;
2121 }
2122
2123 set_user_nice(worker->task, pool->attrs->nice);
2124 kthread_bind_mask(worker->task, pool->attrs->cpumask);
2125
2126 /* successful, attach the worker to the pool */
2127 worker_attach_to_pool(worker, pool);
2128
2129 /* start the newly created worker */
2130 raw_spin_lock_irq(&pool->lock);
2131 worker->pool->nr_workers++;
2132 worker_enter_idle(worker);
2133 wake_up_process(worker->task);
2134 raw_spin_unlock_irq(&pool->lock);
2135
2136 return worker;
2137
2138fail:
2139 ida_free(&pool->worker_ida, id);
2140 kfree(worker);
2141 return NULL;
2142}
2143
2144static void unbind_worker(struct worker *worker)
2145{
2146 lockdep_assert_held(&wq_pool_attach_mutex);
2147
2148 kthread_set_per_cpu(worker->task, -1);
2149 if (cpumask_intersects(wq_unbound_cpumask, cpu_active_mask))
2150 WARN_ON_ONCE(set_cpus_allowed_ptr(worker->task, wq_unbound_cpumask) < 0);
2151 else
2152 WARN_ON_ONCE(set_cpus_allowed_ptr(worker->task, cpu_possible_mask) < 0);
2153}
2154
2155static void wake_dying_workers(struct list_head *cull_list)
2156{
2157 struct worker *worker, *tmp;
2158
2159 list_for_each_entry_safe(worker, tmp, cull_list, entry) {
2160 list_del_init(&worker->entry);
2161 unbind_worker(worker);
2162 /*
2163 * If the worker was somehow already running, then it had to be
2164 * in pool->idle_list when set_worker_dying() happened or we
2165 * wouldn't have gotten here.
2166 *
2167 * Thus, the worker must either have observed the WORKER_DIE
2168 * flag, or have set its state to TASK_IDLE. Either way, the
2169 * below will be observed by the worker and is safe to do
2170 * outside of pool->lock.
2171 */
2172 wake_up_process(worker->task);
2173 }
2174}
2175
2176/**
2177 * set_worker_dying - Tag a worker for destruction
2178 * @worker: worker to be destroyed
2179 * @list: transfer worker away from its pool->idle_list and into list
2180 *
2181 * Tag @worker for destruction and adjust @pool stats accordingly. The worker
2182 * should be idle.
2183 *
2184 * CONTEXT:
2185 * raw_spin_lock_irq(pool->lock).
2186 */
2187static void set_worker_dying(struct worker *worker, struct list_head *list)
2188{
2189 struct worker_pool *pool = worker->pool;
2190
2191 lockdep_assert_held(&pool->lock);
2192 lockdep_assert_held(&wq_pool_attach_mutex);
2193
2194 /* sanity check frenzy */
2195 if (WARN_ON(worker->current_work) ||
2196 WARN_ON(!list_empty(&worker->scheduled)) ||
2197 WARN_ON(!(worker->flags & WORKER_IDLE)))
2198 return;
2199
2200 pool->nr_workers--;
2201 pool->nr_idle--;
2202
2203 worker->flags |= WORKER_DIE;
2204
2205 list_move(&worker->entry, list);
2206 list_move(&worker->node, &pool->dying_workers);
2207}
2208
2209/**
2210 * idle_worker_timeout - check if some idle workers can now be deleted.
2211 * @t: The pool's idle_timer that just expired
2212 *
2213 * The timer is armed in worker_enter_idle(). Note that it isn't disarmed in
2214 * worker_leave_idle(), as a worker flicking between idle and active while its
2215 * pool is at the too_many_workers() tipping point would cause too much timer
2216 * housekeeping overhead. Since IDLE_WORKER_TIMEOUT is long enough, we just let
2217 * it expire and re-evaluate things from there.
2218 */
2219static void idle_worker_timeout(struct timer_list *t)
2220{
2221 struct worker_pool *pool = from_timer(pool, t, idle_timer);
2222 bool do_cull = false;
2223
2224 if (work_pending(&pool->idle_cull_work))
2225 return;
2226
2227 raw_spin_lock_irq(&pool->lock);
2228
2229 if (too_many_workers(pool)) {
2230 struct worker *worker;
2231 unsigned long expires;
2232
2233 /* idle_list is kept in LIFO order, check the last one */
2234 worker = list_entry(pool->idle_list.prev, struct worker, entry);
2235 expires = worker->last_active + IDLE_WORKER_TIMEOUT;
2236 do_cull = !time_before(jiffies, expires);
2237
2238 if (!do_cull)
2239 mod_timer(&pool->idle_timer, expires);
2240 }
2241 raw_spin_unlock_irq(&pool->lock);
2242
2243 if (do_cull)
2244 queue_work(system_unbound_wq, &pool->idle_cull_work);
2245}
2246
2247/**
2248 * idle_cull_fn - cull workers that have been idle for too long.
2249 * @work: the pool's work for handling these idle workers
2250 *
2251 * This goes through a pool's idle workers and gets rid of those that have been
2252 * idle for at least IDLE_WORKER_TIMEOUT seconds.
2253 *
2254 * We don't want to disturb isolated CPUs because of a pcpu kworker being
2255 * culled, so this also resets worker affinity. This requires a sleepable
2256 * context, hence the split between timer callback and work item.
2257 */
2258static void idle_cull_fn(struct work_struct *work)
2259{
2260 struct worker_pool *pool = container_of(work, struct worker_pool, idle_cull_work);
2261 LIST_HEAD(cull_list);
2262
2263 /*
2264 * Grabbing wq_pool_attach_mutex here ensures an already-running worker
2265 * cannot proceed beyong worker_detach_from_pool() in its self-destruct
2266 * path. This is required as a previously-preempted worker could run after
2267 * set_worker_dying() has happened but before wake_dying_workers() did.
2268 */
2269 mutex_lock(&wq_pool_attach_mutex);
2270 raw_spin_lock_irq(&pool->lock);
2271
2272 while (too_many_workers(pool)) {
2273 struct worker *worker;
2274 unsigned long expires;
2275
2276 worker = list_entry(pool->idle_list.prev, struct worker, entry);
2277 expires = worker->last_active + IDLE_WORKER_TIMEOUT;
2278
2279 if (time_before(jiffies, expires)) {
2280 mod_timer(&pool->idle_timer, expires);
2281 break;
2282 }
2283
2284 set_worker_dying(worker, &cull_list);
2285 }
2286
2287 raw_spin_unlock_irq(&pool->lock);
2288 wake_dying_workers(&cull_list);
2289 mutex_unlock(&wq_pool_attach_mutex);
2290}
2291
2292static void send_mayday(struct work_struct *work)
2293{
2294 struct pool_workqueue *pwq = get_work_pwq(work);
2295 struct workqueue_struct *wq = pwq->wq;
2296
2297 lockdep_assert_held(&wq_mayday_lock);
2298
2299 if (!wq->rescuer)
2300 return;
2301
2302 /* mayday mayday mayday */
2303 if (list_empty(&pwq->mayday_node)) {
2304 /*
2305 * If @pwq is for an unbound wq, its base ref may be put at
2306 * any time due to an attribute change. Pin @pwq until the
2307 * rescuer is done with it.
2308 */
2309 get_pwq(pwq);
2310 list_add_tail(&pwq->mayday_node, &wq->maydays);
2311 wake_up_process(wq->rescuer->task);
2312 pwq->stats[PWQ_STAT_MAYDAY]++;
2313 }
2314}
2315
2316static void pool_mayday_timeout(struct timer_list *t)
2317{
2318 struct worker_pool *pool = from_timer(pool, t, mayday_timer);
2319 struct work_struct *work;
2320
2321 raw_spin_lock_irq(&pool->lock);
2322 raw_spin_lock(&wq_mayday_lock); /* for wq->maydays */
2323
2324 if (need_to_create_worker(pool)) {
2325 /*
2326 * We've been trying to create a new worker but
2327 * haven't been successful. We might be hitting an
2328 * allocation deadlock. Send distress signals to
2329 * rescuers.
2330 */
2331 list_for_each_entry(work, &pool->worklist, entry)
2332 send_mayday(work);
2333 }
2334
2335 raw_spin_unlock(&wq_mayday_lock);
2336 raw_spin_unlock_irq(&pool->lock);
2337
2338 mod_timer(&pool->mayday_timer, jiffies + MAYDAY_INTERVAL);
2339}
2340
2341/**
2342 * maybe_create_worker - create a new worker if necessary
2343 * @pool: pool to create a new worker for
2344 *
2345 * Create a new worker for @pool if necessary. @pool is guaranteed to
2346 * have at least one idle worker on return from this function. If
2347 * creating a new worker takes longer than MAYDAY_INTERVAL, mayday is
2348 * sent to all rescuers with works scheduled on @pool to resolve
2349 * possible allocation deadlock.
2350 *
2351 * On return, need_to_create_worker() is guaranteed to be %false and
2352 * may_start_working() %true.
2353 *
2354 * LOCKING:
2355 * raw_spin_lock_irq(pool->lock) which may be released and regrabbed
2356 * multiple times. Does GFP_KERNEL allocations. Called only from
2357 * manager.
2358 */
2359static void maybe_create_worker(struct worker_pool *pool)
2360__releases(&pool->lock)
2361__acquires(&pool->lock)
2362{
2363restart:
2364 raw_spin_unlock_irq(&pool->lock);
2365
2366 /* if we don't make progress in MAYDAY_INITIAL_TIMEOUT, call for help */
2367 mod_timer(&pool->mayday_timer, jiffies + MAYDAY_INITIAL_TIMEOUT);
2368
2369 while (true) {
2370 if (create_worker(pool) || !need_to_create_worker(pool))
2371 break;
2372
2373 schedule_timeout_interruptible(CREATE_COOLDOWN);
2374
2375 if (!need_to_create_worker(pool))
2376 break;
2377 }
2378
2379 del_timer_sync(&pool->mayday_timer);
2380 raw_spin_lock_irq(&pool->lock);
2381 /*
2382 * This is necessary even after a new worker was just successfully
2383 * created as @pool->lock was dropped and the new worker might have
2384 * already become busy.
2385 */
2386 if (need_to_create_worker(pool))
2387 goto restart;
2388}
2389
2390/**
2391 * manage_workers - manage worker pool
2392 * @worker: self
2393 *
2394 * Assume the manager role and manage the worker pool @worker belongs
2395 * to. At any given time, there can be only zero or one manager per
2396 * pool. The exclusion is handled automatically by this function.
2397 *
2398 * The caller can safely start processing works on false return. On
2399 * true return, it's guaranteed that need_to_create_worker() is false
2400 * and may_start_working() is true.
2401 *
2402 * CONTEXT:
2403 * raw_spin_lock_irq(pool->lock) which may be released and regrabbed
2404 * multiple times. Does GFP_KERNEL allocations.
2405 *
2406 * Return:
2407 * %false if the pool doesn't need management and the caller can safely
2408 * start processing works, %true if management function was performed and
2409 * the conditions that the caller verified before calling the function may
2410 * no longer be true.
2411 */
2412static bool manage_workers(struct worker *worker)
2413{
2414 struct worker_pool *pool = worker->pool;
2415
2416 if (pool->flags & POOL_MANAGER_ACTIVE)
2417 return false;
2418
2419 pool->flags |= POOL_MANAGER_ACTIVE;
2420 pool->manager = worker;
2421
2422 maybe_create_worker(pool);
2423
2424 pool->manager = NULL;
2425 pool->flags &= ~POOL_MANAGER_ACTIVE;
2426 rcuwait_wake_up(&manager_wait);
2427 return true;
2428}
2429
2430/**
2431 * process_one_work - process single work
2432 * @worker: self
2433 * @work: work to process
2434 *
2435 * Process @work. This function contains all the logics necessary to
2436 * process a single work including synchronization against and
2437 * interaction with other workers on the same cpu, queueing and
2438 * flushing. As long as context requirement is met, any worker can
2439 * call this function to process a work.
2440 *
2441 * CONTEXT:
2442 * raw_spin_lock_irq(pool->lock) which is released and regrabbed.
2443 */
2444static void process_one_work(struct worker *worker, struct work_struct *work)
2445__releases(&pool->lock)
2446__acquires(&pool->lock)
2447{
2448 struct pool_workqueue *pwq = get_work_pwq(work);
2449 struct worker_pool *pool = worker->pool;
2450 unsigned long work_data;
2451 struct worker *collision;
2452#ifdef CONFIG_LOCKDEP
2453 /*
2454 * It is permissible to free the struct work_struct from
2455 * inside the function that is called from it, this we need to
2456 * take into account for lockdep too. To avoid bogus "held
2457 * lock freed" warnings as well as problems when looking into
2458 * work->lockdep_map, make a copy and use that here.
2459 */
2460 struct lockdep_map lockdep_map;
2461
2462 lockdep_copy_map(&lockdep_map, &work->lockdep_map);
2463#endif
2464 /* ensure we're on the correct CPU */
2465 WARN_ON_ONCE(!(pool->flags & POOL_DISASSOCIATED) &&
2466 raw_smp_processor_id() != pool->cpu);
2467
2468 /*
2469 * A single work shouldn't be executed concurrently by
2470 * multiple workers on a single cpu. Check whether anyone is
2471 * already processing the work. If so, defer the work to the
2472 * currently executing one.
2473 */
2474 collision = find_worker_executing_work(pool, work);
2475 if (unlikely(collision)) {
2476 move_linked_works(work, &collision->scheduled, NULL);
2477 return;
2478 }
2479
2480 /* claim and dequeue */
2481 debug_work_deactivate(work);
2482 hash_add(pool->busy_hash, &worker->hentry, (unsigned long)work);
2483 worker->current_work = work;
2484 worker->current_func = work->func;
2485 worker->current_pwq = pwq;
2486 worker->current_at = worker->task->se.sum_exec_runtime;
2487 work_data = *work_data_bits(work);
2488 worker->current_color = get_work_color(work_data);
2489
2490 /*
2491 * Record wq name for cmdline and debug reporting, may get
2492 * overridden through set_worker_desc().
2493 */
2494 strscpy(worker->desc, pwq->wq->name, WORKER_DESC_LEN);
2495
2496 list_del_init(&work->entry);
2497
2498 /*
2499 * CPU intensive works don't participate in concurrency management.
2500 * They're the scheduler's responsibility. This takes @worker out
2501 * of concurrency management and the next code block will chain
2502 * execution of the pending work items.
2503 */
2504 if (unlikely(pwq->wq->flags & WQ_CPU_INTENSIVE))
2505 worker_set_flags(worker, WORKER_CPU_INTENSIVE);
2506
2507 /*
2508 * Wake up another worker if necessary. The condition is always
2509 * false for normal per-cpu workers since nr_running would always
2510 * be >= 1 at this point. This is used to chain execution of the
2511 * pending work items for WORKER_NOT_RUNNING workers such as the
2512 * UNBOUND and CPU_INTENSIVE ones.
2513 */
2514 if (need_more_worker(pool))
2515 wake_up_worker(pool);
2516
2517 /*
2518 * Record the last pool and clear PENDING which should be the last
2519 * update to @work. Also, do this inside @pool->lock so that
2520 * PENDING and queued state changes happen together while IRQ is
2521 * disabled.
2522 */
2523 set_work_pool_and_clear_pending(work, pool->id);
2524
2525 raw_spin_unlock_irq(&pool->lock);
2526
2527 lock_map_acquire(&pwq->wq->lockdep_map);
2528 lock_map_acquire(&lockdep_map);
2529 /*
2530 * Strictly speaking we should mark the invariant state without holding
2531 * any locks, that is, before these two lock_map_acquire()'s.
2532 *
2533 * However, that would result in:
2534 *
2535 * A(W1)
2536 * WFC(C)
2537 * A(W1)
2538 * C(C)
2539 *
2540 * Which would create W1->C->W1 dependencies, even though there is no
2541 * actual deadlock possible. There are two solutions, using a
2542 * read-recursive acquire on the work(queue) 'locks', but this will then
2543 * hit the lockdep limitation on recursive locks, or simply discard
2544 * these locks.
2545 *
2546 * AFAICT there is no possible deadlock scenario between the
2547 * flush_work() and complete() primitives (except for single-threaded
2548 * workqueues), so hiding them isn't a problem.
2549 */
2550 lockdep_invariant_state(true);
2551 pwq->stats[PWQ_STAT_STARTED]++;
2552 trace_workqueue_execute_start(work);
2553 worker->current_func(work);
2554 /*
2555 * While we must be careful to not use "work" after this, the trace
2556 * point will only record its address.
2557 */
2558 trace_workqueue_execute_end(work, worker->current_func);
2559 pwq->stats[PWQ_STAT_COMPLETED]++;
2560 lock_map_release(&lockdep_map);
2561 lock_map_release(&pwq->wq->lockdep_map);
2562
2563 if (unlikely(in_atomic() || lockdep_depth(current) > 0)) {
2564 pr_err("BUG: workqueue leaked lock or atomic: %s/0x%08x/%d\n"
2565 " last function: %ps\n",
2566 current->comm, preempt_count(), task_pid_nr(current),
2567 worker->current_func);
2568 debug_show_held_locks(current);
2569 dump_stack();
2570 }
2571
2572 /*
2573 * The following prevents a kworker from hogging CPU on !PREEMPTION
2574 * kernels, where a requeueing work item waiting for something to
2575 * happen could deadlock with stop_machine as such work item could
2576 * indefinitely requeue itself while all other CPUs are trapped in
2577 * stop_machine. At the same time, report a quiescent RCU state so
2578 * the same condition doesn't freeze RCU.
2579 */
2580 cond_resched();
2581
2582 raw_spin_lock_irq(&pool->lock);
2583
2584 /*
2585 * In addition to %WQ_CPU_INTENSIVE, @worker may also have been marked
2586 * CPU intensive by wq_worker_tick() if @work hogged CPU longer than
2587 * wq_cpu_intensive_thresh_us. Clear it.
2588 */
2589 worker_clr_flags(worker, WORKER_CPU_INTENSIVE);
2590
2591 /* tag the worker for identification in schedule() */
2592 worker->last_func = worker->current_func;
2593
2594 /* we're done with it, release */
2595 hash_del(&worker->hentry);
2596 worker->current_work = NULL;
2597 worker->current_func = NULL;
2598 worker->current_pwq = NULL;
2599 worker->current_color = INT_MAX;
2600 pwq_dec_nr_in_flight(pwq, work_data);
2601}
2602
2603/**
2604 * process_scheduled_works - process scheduled works
2605 * @worker: self
2606 *
2607 * Process all scheduled works. Please note that the scheduled list
2608 * may change while processing a work, so this function repeatedly
2609 * fetches a work from the top and executes it.
2610 *
2611 * CONTEXT:
2612 * raw_spin_lock_irq(pool->lock) which may be released and regrabbed
2613 * multiple times.
2614 */
2615static void process_scheduled_works(struct worker *worker)
2616{
2617 struct work_struct *work;
2618 bool first = true;
2619
2620 while ((work = list_first_entry_or_null(&worker->scheduled,
2621 struct work_struct, entry))) {
2622 if (first) {
2623 worker->pool->watchdog_ts = jiffies;
2624 first = false;
2625 }
2626 process_one_work(worker, work);
2627 }
2628}
2629
2630static void set_pf_worker(bool val)
2631{
2632 mutex_lock(&wq_pool_attach_mutex);
2633 if (val)
2634 current->flags |= PF_WQ_WORKER;
2635 else
2636 current->flags &= ~PF_WQ_WORKER;
2637 mutex_unlock(&wq_pool_attach_mutex);
2638}
2639
2640/**
2641 * worker_thread - the worker thread function
2642 * @__worker: self
2643 *
2644 * The worker thread function. All workers belong to a worker_pool -
2645 * either a per-cpu one or dynamic unbound one. These workers process all
2646 * work items regardless of their specific target workqueue. The only
2647 * exception is work items which belong to workqueues with a rescuer which
2648 * will be explained in rescuer_thread().
2649 *
2650 * Return: 0
2651 */
2652static int worker_thread(void *__worker)
2653{
2654 struct worker *worker = __worker;
2655 struct worker_pool *pool = worker->pool;
2656
2657 /* tell the scheduler that this is a workqueue worker */
2658 set_pf_worker(true);
2659woke_up:
2660 raw_spin_lock_irq(&pool->lock);
2661
2662 /* am I supposed to die? */
2663 if (unlikely(worker->flags & WORKER_DIE)) {
2664 raw_spin_unlock_irq(&pool->lock);
2665 set_pf_worker(false);
2666
2667 set_task_comm(worker->task, "kworker/dying");
2668 ida_free(&pool->worker_ida, worker->id);
2669 worker_detach_from_pool(worker);
2670 WARN_ON_ONCE(!list_empty(&worker->entry));
2671 kfree(worker);
2672 return 0;
2673 }
2674
2675 worker_leave_idle(worker);
2676recheck:
2677 /* no more worker necessary? */
2678 if (!need_more_worker(pool))
2679 goto sleep;
2680
2681 /* do we need to manage? */
2682 if (unlikely(!may_start_working(pool)) && manage_workers(worker))
2683 goto recheck;
2684
2685 /*
2686 * ->scheduled list can only be filled while a worker is
2687 * preparing to process a work or actually processing it.
2688 * Make sure nobody diddled with it while I was sleeping.
2689 */
2690 WARN_ON_ONCE(!list_empty(&worker->scheduled));
2691
2692 /*
2693 * Finish PREP stage. We're guaranteed to have at least one idle
2694 * worker or that someone else has already assumed the manager
2695 * role. This is where @worker starts participating in concurrency
2696 * management if applicable and concurrency management is restored
2697 * after being rebound. See rebind_workers() for details.
2698 */
2699 worker_clr_flags(worker, WORKER_PREP | WORKER_REBOUND);
2700
2701 do {
2702 struct work_struct *work =
2703 list_first_entry(&pool->worklist,
2704 struct work_struct, entry);
2705
2706 move_linked_works(work, &worker->scheduled, NULL);
2707 process_scheduled_works(worker);
2708 } while (keep_working(pool));
2709
2710 worker_set_flags(worker, WORKER_PREP);
2711sleep:
2712 /*
2713 * pool->lock is held and there's no work to process and no need to
2714 * manage, sleep. Workers are woken up only while holding
2715 * pool->lock or from local cpu, so setting the current state
2716 * before releasing pool->lock is enough to prevent losing any
2717 * event.
2718 */
2719 worker_enter_idle(worker);
2720 __set_current_state(TASK_IDLE);
2721 raw_spin_unlock_irq(&pool->lock);
2722 schedule();
2723 goto woke_up;
2724}
2725
2726/**
2727 * rescuer_thread - the rescuer thread function
2728 * @__rescuer: self
2729 *
2730 * Workqueue rescuer thread function. There's one rescuer for each
2731 * workqueue which has WQ_MEM_RECLAIM set.
2732 *
2733 * Regular work processing on a pool may block trying to create a new
2734 * worker which uses GFP_KERNEL allocation which has slight chance of
2735 * developing into deadlock if some works currently on the same queue
2736 * need to be processed to satisfy the GFP_KERNEL allocation. This is
2737 * the problem rescuer solves.
2738 *
2739 * When such condition is possible, the pool summons rescuers of all
2740 * workqueues which have works queued on the pool and let them process
2741 * those works so that forward progress can be guaranteed.
2742 *
2743 * This should happen rarely.
2744 *
2745 * Return: 0
2746 */
2747static int rescuer_thread(void *__rescuer)
2748{
2749 struct worker *rescuer = __rescuer;
2750 struct workqueue_struct *wq = rescuer->rescue_wq;
2751 struct list_head *scheduled = &rescuer->scheduled;
2752 bool should_stop;
2753
2754 set_user_nice(current, RESCUER_NICE_LEVEL);
2755
2756 /*
2757 * Mark rescuer as worker too. As WORKER_PREP is never cleared, it
2758 * doesn't participate in concurrency management.
2759 */
2760 set_pf_worker(true);
2761repeat:
2762 set_current_state(TASK_IDLE);
2763
2764 /*
2765 * By the time the rescuer is requested to stop, the workqueue
2766 * shouldn't have any work pending, but @wq->maydays may still have
2767 * pwq(s) queued. This can happen by non-rescuer workers consuming
2768 * all the work items before the rescuer got to them. Go through
2769 * @wq->maydays processing before acting on should_stop so that the
2770 * list is always empty on exit.
2771 */
2772 should_stop = kthread_should_stop();
2773
2774 /* see whether any pwq is asking for help */
2775 raw_spin_lock_irq(&wq_mayday_lock);
2776
2777 while (!list_empty(&wq->maydays)) {
2778 struct pool_workqueue *pwq = list_first_entry(&wq->maydays,
2779 struct pool_workqueue, mayday_node);
2780 struct worker_pool *pool = pwq->pool;
2781 struct work_struct *work, *n;
2782
2783 __set_current_state(TASK_RUNNING);
2784 list_del_init(&pwq->mayday_node);
2785
2786 raw_spin_unlock_irq(&wq_mayday_lock);
2787
2788 worker_attach_to_pool(rescuer, pool);
2789
2790 raw_spin_lock_irq(&pool->lock);
2791
2792 /*
2793 * Slurp in all works issued via this workqueue and
2794 * process'em.
2795 */
2796 WARN_ON_ONCE(!list_empty(scheduled));
2797 list_for_each_entry_safe(work, n, &pool->worklist, entry) {
2798 if (get_work_pwq(work) == pwq) {
2799 move_linked_works(work, scheduled, &n);
2800 pwq->stats[PWQ_STAT_RESCUED]++;
2801 }
2802 }
2803
2804 if (!list_empty(scheduled)) {
2805 process_scheduled_works(rescuer);
2806
2807 /*
2808 * The above execution of rescued work items could
2809 * have created more to rescue through
2810 * pwq_activate_first_inactive() or chained
2811 * queueing. Let's put @pwq back on mayday list so
2812 * that such back-to-back work items, which may be
2813 * being used to relieve memory pressure, don't
2814 * incur MAYDAY_INTERVAL delay inbetween.
2815 */
2816 if (pwq->nr_active && need_to_create_worker(pool)) {
2817 raw_spin_lock(&wq_mayday_lock);
2818 /*
2819 * Queue iff we aren't racing destruction
2820 * and somebody else hasn't queued it already.
2821 */
2822 if (wq->rescuer && list_empty(&pwq->mayday_node)) {
2823 get_pwq(pwq);
2824 list_add_tail(&pwq->mayday_node, &wq->maydays);
2825 }
2826 raw_spin_unlock(&wq_mayday_lock);
2827 }
2828 }
2829
2830 /*
2831 * Put the reference grabbed by send_mayday(). @pool won't
2832 * go away while we're still attached to it.
2833 */
2834 put_pwq(pwq);
2835
2836 /*
2837 * Leave this pool. If need_more_worker() is %true, notify a
2838 * regular worker; otherwise, we end up with 0 concurrency
2839 * and stalling the execution.
2840 */
2841 if (need_more_worker(pool))
2842 wake_up_worker(pool);
2843
2844 raw_spin_unlock_irq(&pool->lock);
2845
2846 worker_detach_from_pool(rescuer);
2847
2848 raw_spin_lock_irq(&wq_mayday_lock);
2849 }
2850
2851 raw_spin_unlock_irq(&wq_mayday_lock);
2852
2853 if (should_stop) {
2854 __set_current_state(TASK_RUNNING);
2855 set_pf_worker(false);
2856 return 0;
2857 }
2858
2859 /* rescuers should never participate in concurrency management */
2860 WARN_ON_ONCE(!(rescuer->flags & WORKER_NOT_RUNNING));
2861 schedule();
2862 goto repeat;
2863}
2864
2865/**
2866 * check_flush_dependency - check for flush dependency sanity
2867 * @target_wq: workqueue being flushed
2868 * @target_work: work item being flushed (NULL for workqueue flushes)
2869 *
2870 * %current is trying to flush the whole @target_wq or @target_work on it.
2871 * If @target_wq doesn't have %WQ_MEM_RECLAIM, verify that %current is not
2872 * reclaiming memory or running on a workqueue which doesn't have
2873 * %WQ_MEM_RECLAIM as that can break forward-progress guarantee leading to
2874 * a deadlock.
2875 */
2876static void check_flush_dependency(struct workqueue_struct *target_wq,
2877 struct work_struct *target_work)
2878{
2879 work_func_t target_func = target_work ? target_work->func : NULL;
2880 struct worker *worker;
2881
2882 if (target_wq->flags & WQ_MEM_RECLAIM)
2883 return;
2884
2885 worker = current_wq_worker();
2886
2887 WARN_ONCE(current->flags & PF_MEMALLOC,
2888 "workqueue: PF_MEMALLOC task %d(%s) is flushing !WQ_MEM_RECLAIM %s:%ps",
2889 current->pid, current->comm, target_wq->name, target_func);
2890 WARN_ONCE(worker && ((worker->current_pwq->wq->flags &
2891 (WQ_MEM_RECLAIM | __WQ_LEGACY)) == WQ_MEM_RECLAIM),
2892 "workqueue: WQ_MEM_RECLAIM %s:%ps is flushing !WQ_MEM_RECLAIM %s:%ps",
2893 worker->current_pwq->wq->name, worker->current_func,
2894 target_wq->name, target_func);
2895}
2896
2897struct wq_barrier {
2898 struct work_struct work;
2899 struct completion done;
2900 struct task_struct *task; /* purely informational */
2901};
2902
2903static void wq_barrier_func(struct work_struct *work)
2904{
2905 struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
2906 complete(&barr->done);
2907}
2908
2909/**
2910 * insert_wq_barrier - insert a barrier work
2911 * @pwq: pwq to insert barrier into
2912 * @barr: wq_barrier to insert
2913 * @target: target work to attach @barr to
2914 * @worker: worker currently executing @target, NULL if @target is not executing
2915 *
2916 * @barr is linked to @target such that @barr is completed only after
2917 * @target finishes execution. Please note that the ordering
2918 * guarantee is observed only with respect to @target and on the local
2919 * cpu.
2920 *
2921 * Currently, a queued barrier can't be canceled. This is because
2922 * try_to_grab_pending() can't determine whether the work to be
2923 * grabbed is at the head of the queue and thus can't clear LINKED
2924 * flag of the previous work while there must be a valid next work
2925 * after a work with LINKED flag set.
2926 *
2927 * Note that when @worker is non-NULL, @target may be modified
2928 * underneath us, so we can't reliably determine pwq from @target.
2929 *
2930 * CONTEXT:
2931 * raw_spin_lock_irq(pool->lock).
2932 */
2933static void insert_wq_barrier(struct pool_workqueue *pwq,
2934 struct wq_barrier *barr,
2935 struct work_struct *target, struct worker *worker)
2936{
2937 unsigned int work_flags = 0;
2938 unsigned int work_color;
2939 struct list_head *head;
2940
2941 /*
2942 * debugobject calls are safe here even with pool->lock locked
2943 * as we know for sure that this will not trigger any of the
2944 * checks and call back into the fixup functions where we
2945 * might deadlock.
2946 */
2947 INIT_WORK_ONSTACK(&barr->work, wq_barrier_func);
2948 __set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
2949
2950 init_completion_map(&barr->done, &target->lockdep_map);
2951
2952 barr->task = current;
2953
2954 /* The barrier work item does not participate in pwq->nr_active. */
2955 work_flags |= WORK_STRUCT_INACTIVE;
2956
2957 /*
2958 * If @target is currently being executed, schedule the
2959 * barrier to the worker; otherwise, put it after @target.
2960 */
2961 if (worker) {
2962 head = worker->scheduled.next;
2963 work_color = worker->current_color;
2964 } else {
2965 unsigned long *bits = work_data_bits(target);
2966
2967 head = target->entry.next;
2968 /* there can already be other linked works, inherit and set */
2969 work_flags |= *bits & WORK_STRUCT_LINKED;
2970 work_color = get_work_color(*bits);
2971 __set_bit(WORK_STRUCT_LINKED_BIT, bits);
2972 }
2973
2974 pwq->nr_in_flight[work_color]++;
2975 work_flags |= work_color_to_flags(work_color);
2976
2977 insert_work(pwq, &barr->work, head, work_flags);
2978}
2979
2980/**
2981 * flush_workqueue_prep_pwqs - prepare pwqs for workqueue flushing
2982 * @wq: workqueue being flushed
2983 * @flush_color: new flush color, < 0 for no-op
2984 * @work_color: new work color, < 0 for no-op
2985 *
2986 * Prepare pwqs for workqueue flushing.
2987 *
2988 * If @flush_color is non-negative, flush_color on all pwqs should be
2989 * -1. If no pwq has in-flight commands at the specified color, all
2990 * pwq->flush_color's stay at -1 and %false is returned. If any pwq
2991 * has in flight commands, its pwq->flush_color is set to
2992 * @flush_color, @wq->nr_pwqs_to_flush is updated accordingly, pwq
2993 * wakeup logic is armed and %true is returned.
2994 *
2995 * The caller should have initialized @wq->first_flusher prior to
2996 * calling this function with non-negative @flush_color. If
2997 * @flush_color is negative, no flush color update is done and %false
2998 * is returned.
2999 *
3000 * If @work_color is non-negative, all pwqs should have the same
3001 * work_color which is previous to @work_color and all will be
3002 * advanced to @work_color.
3003 *
3004 * CONTEXT:
3005 * mutex_lock(wq->mutex).
3006 *
3007 * Return:
3008 * %true if @flush_color >= 0 and there's something to flush. %false
3009 * otherwise.
3010 */
3011static bool flush_workqueue_prep_pwqs(struct workqueue_struct *wq,
3012 int flush_color, int work_color)
3013{
3014 bool wait = false;
3015 struct pool_workqueue *pwq;
3016
3017 if (flush_color >= 0) {
3018 WARN_ON_ONCE(atomic_read(&wq->nr_pwqs_to_flush));
3019 atomic_set(&wq->nr_pwqs_to_flush, 1);
3020 }
3021
3022 for_each_pwq(pwq, wq) {
3023 struct worker_pool *pool = pwq->pool;
3024
3025 raw_spin_lock_irq(&pool->lock);
3026
3027 if (flush_color >= 0) {
3028 WARN_ON_ONCE(pwq->flush_color != -1);
3029
3030 if (pwq->nr_in_flight[flush_color]) {
3031 pwq->flush_color = flush_color;
3032 atomic_inc(&wq->nr_pwqs_to_flush);
3033 wait = true;
3034 }
3035 }
3036
3037 if (work_color >= 0) {
3038 WARN_ON_ONCE(work_color != work_next_color(pwq->work_color));
3039 pwq->work_color = work_color;
3040 }
3041
3042 raw_spin_unlock_irq(&pool->lock);
3043 }
3044
3045 if (flush_color >= 0 && atomic_dec_and_test(&wq->nr_pwqs_to_flush))
3046 complete(&wq->first_flusher->done);
3047
3048 return wait;
3049}
3050
3051/**
3052 * __flush_workqueue - ensure that any scheduled work has run to completion.
3053 * @wq: workqueue to flush
3054 *
3055 * This function sleeps until all work items which were queued on entry
3056 * have finished execution, but it is not livelocked by new incoming ones.
3057 */
3058void __flush_workqueue(struct workqueue_struct *wq)
3059{
3060 struct wq_flusher this_flusher = {
3061 .list = LIST_HEAD_INIT(this_flusher.list),
3062 .flush_color = -1,
3063 .done = COMPLETION_INITIALIZER_ONSTACK_MAP(this_flusher.done, wq->lockdep_map),
3064 };
3065 int next_color;
3066
3067 if (WARN_ON(!wq_online))
3068 return;
3069
3070 lock_map_acquire(&wq->lockdep_map);
3071 lock_map_release(&wq->lockdep_map);
3072
3073 mutex_lock(&wq->mutex);
3074
3075 /*
3076 * Start-to-wait phase
3077 */
3078 next_color = work_next_color(wq->work_color);
3079
3080 if (next_color != wq->flush_color) {
3081 /*
3082 * Color space is not full. The current work_color
3083 * becomes our flush_color and work_color is advanced
3084 * by one.
3085 */
3086 WARN_ON_ONCE(!list_empty(&wq->flusher_overflow));
3087 this_flusher.flush_color = wq->work_color;
3088 wq->work_color = next_color;
3089
3090 if (!wq->first_flusher) {
3091 /* no flush in progress, become the first flusher */
3092 WARN_ON_ONCE(wq->flush_color != this_flusher.flush_color);
3093
3094 wq->first_flusher = &this_flusher;
3095
3096 if (!flush_workqueue_prep_pwqs(wq, wq->flush_color,
3097 wq->work_color)) {
3098 /* nothing to flush, done */
3099 wq->flush_color = next_color;
3100 wq->first_flusher = NULL;
3101 goto out_unlock;
3102 }
3103 } else {
3104 /* wait in queue */
3105 WARN_ON_ONCE(wq->flush_color == this_flusher.flush_color);
3106 list_add_tail(&this_flusher.list, &wq->flusher_queue);
3107 flush_workqueue_prep_pwqs(wq, -1, wq->work_color);
3108 }
3109 } else {
3110 /*
3111 * Oops, color space is full, wait on overflow queue.
3112 * The next flush completion will assign us
3113 * flush_color and transfer to flusher_queue.
3114 */
3115 list_add_tail(&this_flusher.list, &wq->flusher_overflow);
3116 }
3117
3118 check_flush_dependency(wq, NULL);
3119
3120 mutex_unlock(&wq->mutex);
3121
3122 wait_for_completion(&this_flusher.done);
3123
3124 /*
3125 * Wake-up-and-cascade phase
3126 *
3127 * First flushers are responsible for cascading flushes and
3128 * handling overflow. Non-first flushers can simply return.
3129 */
3130 if (READ_ONCE(wq->first_flusher) != &this_flusher)
3131 return;
3132
3133 mutex_lock(&wq->mutex);
3134
3135 /* we might have raced, check again with mutex held */
3136 if (wq->first_flusher != &this_flusher)
3137 goto out_unlock;
3138
3139 WRITE_ONCE(wq->first_flusher, NULL);
3140
3141 WARN_ON_ONCE(!list_empty(&this_flusher.list));
3142 WARN_ON_ONCE(wq->flush_color != this_flusher.flush_color);
3143
3144 while (true) {
3145 struct wq_flusher *next, *tmp;
3146
3147 /* complete all the flushers sharing the current flush color */
3148 list_for_each_entry_safe(next, tmp, &wq->flusher_queue, list) {
3149 if (next->flush_color != wq->flush_color)
3150 break;
3151 list_del_init(&next->list);
3152 complete(&next->done);
3153 }
3154
3155 WARN_ON_ONCE(!list_empty(&wq->flusher_overflow) &&
3156 wq->flush_color != work_next_color(wq->work_color));
3157
3158 /* this flush_color is finished, advance by one */
3159 wq->flush_color = work_next_color(wq->flush_color);
3160
3161 /* one color has been freed, handle overflow queue */
3162 if (!list_empty(&wq->flusher_overflow)) {
3163 /*
3164 * Assign the same color to all overflowed
3165 * flushers, advance work_color and append to
3166 * flusher_queue. This is the start-to-wait
3167 * phase for these overflowed flushers.
3168 */
3169 list_for_each_entry(tmp, &wq->flusher_overflow, list)
3170 tmp->flush_color = wq->work_color;
3171
3172 wq->work_color = work_next_color(wq->work_color);
3173
3174 list_splice_tail_init(&wq->flusher_overflow,
3175 &wq->flusher_queue);
3176 flush_workqueue_prep_pwqs(wq, -1, wq->work_color);
3177 }
3178
3179 if (list_empty(&wq->flusher_queue)) {
3180 WARN_ON_ONCE(wq->flush_color != wq->work_color);
3181 break;
3182 }
3183
3184 /*
3185 * Need to flush more colors. Make the next flusher
3186 * the new first flusher and arm pwqs.
3187 */
3188 WARN_ON_ONCE(wq->flush_color == wq->work_color);
3189 WARN_ON_ONCE(wq->flush_color != next->flush_color);
3190
3191 list_del_init(&next->list);
3192 wq->first_flusher = next;
3193
3194 if (flush_workqueue_prep_pwqs(wq, wq->flush_color, -1))
3195 break;
3196
3197 /*
3198 * Meh... this color is already done, clear first
3199 * flusher and repeat cascading.
3200 */
3201 wq->first_flusher = NULL;
3202 }
3203
3204out_unlock:
3205 mutex_unlock(&wq->mutex);
3206}
3207EXPORT_SYMBOL(__flush_workqueue);
3208
3209/**
3210 * drain_workqueue - drain a workqueue
3211 * @wq: workqueue to drain
3212 *
3213 * Wait until the workqueue becomes empty. While draining is in progress,
3214 * only chain queueing is allowed. IOW, only currently pending or running
3215 * work items on @wq can queue further work items on it. @wq is flushed
3216 * repeatedly until it becomes empty. The number of flushing is determined
3217 * by the depth of chaining and should be relatively short. Whine if it
3218 * takes too long.
3219 */
3220void drain_workqueue(struct workqueue_struct *wq)
3221{
3222 unsigned int flush_cnt = 0;
3223 struct pool_workqueue *pwq;
3224
3225 /*
3226 * __queue_work() needs to test whether there are drainers, is much
3227 * hotter than drain_workqueue() and already looks at @wq->flags.
3228 * Use __WQ_DRAINING so that queue doesn't have to check nr_drainers.
3229 */
3230 mutex_lock(&wq->mutex);
3231 if (!wq->nr_drainers++)
3232 wq->flags |= __WQ_DRAINING;
3233 mutex_unlock(&wq->mutex);
3234reflush:
3235 __flush_workqueue(wq);
3236
3237 mutex_lock(&wq->mutex);
3238
3239 for_each_pwq(pwq, wq) {
3240 bool drained;
3241
3242 raw_spin_lock_irq(&pwq->pool->lock);
3243 drained = !pwq->nr_active && list_empty(&pwq->inactive_works);
3244 raw_spin_unlock_irq(&pwq->pool->lock);
3245
3246 if (drained)
3247 continue;
3248
3249 if (++flush_cnt == 10 ||
3250 (flush_cnt % 100 == 0 && flush_cnt <= 1000))
3251 pr_warn("workqueue %s: %s() isn't complete after %u tries\n",
3252 wq->name, __func__, flush_cnt);
3253
3254 mutex_unlock(&wq->mutex);
3255 goto reflush;
3256 }
3257
3258 if (!--wq->nr_drainers)
3259 wq->flags &= ~__WQ_DRAINING;
3260 mutex_unlock(&wq->mutex);
3261}
3262EXPORT_SYMBOL_GPL(drain_workqueue);
3263
3264static bool start_flush_work(struct work_struct *work, struct wq_barrier *barr,
3265 bool from_cancel)
3266{
3267 struct worker *worker = NULL;
3268 struct worker_pool *pool;
3269 struct pool_workqueue *pwq;
3270
3271 might_sleep();
3272
3273 rcu_read_lock();
3274 pool = get_work_pool(work);
3275 if (!pool) {
3276 rcu_read_unlock();
3277 return false;
3278 }
3279
3280 raw_spin_lock_irq(&pool->lock);
3281 /* see the comment in try_to_grab_pending() with the same code */
3282 pwq = get_work_pwq(work);
3283 if (pwq) {
3284 if (unlikely(pwq->pool != pool))
3285 goto already_gone;
3286 } else {
3287 worker = find_worker_executing_work(pool, work);
3288 if (!worker)
3289 goto already_gone;
3290 pwq = worker->current_pwq;
3291 }
3292
3293 check_flush_dependency(pwq->wq, work);
3294
3295 insert_wq_barrier(pwq, barr, work, worker);
3296 raw_spin_unlock_irq(&pool->lock);
3297
3298 /*
3299 * Force a lock recursion deadlock when using flush_work() inside a
3300 * single-threaded or rescuer equipped workqueue.
3301 *
3302 * For single threaded workqueues the deadlock happens when the work
3303 * is after the work issuing the flush_work(). For rescuer equipped
3304 * workqueues the deadlock happens when the rescuer stalls, blocking
3305 * forward progress.
3306 */
3307 if (!from_cancel &&
3308 (pwq->wq->saved_max_active == 1 || pwq->wq->rescuer)) {
3309 lock_map_acquire(&pwq->wq->lockdep_map);
3310 lock_map_release(&pwq->wq->lockdep_map);
3311 }
3312 rcu_read_unlock();
3313 return true;
3314already_gone:
3315 raw_spin_unlock_irq(&pool->lock);
3316 rcu_read_unlock();
3317 return false;
3318}
3319
3320static bool __flush_work(struct work_struct *work, bool from_cancel)
3321{
3322 struct wq_barrier barr;
3323
3324 if (WARN_ON(!wq_online))
3325 return false;
3326
3327 if (WARN_ON(!work->func))
3328 return false;
3329
3330 lock_map_acquire(&work->lockdep_map);
3331 lock_map_release(&work->lockdep_map);
3332
3333 if (start_flush_work(work, &barr, from_cancel)) {
3334 wait_for_completion(&barr.done);
3335 destroy_work_on_stack(&barr.work);
3336 return true;
3337 } else {
3338 return false;
3339 }
3340}
3341
3342/**
3343 * flush_work - wait for a work to finish executing the last queueing instance
3344 * @work: the work to flush
3345 *
3346 * Wait until @work has finished execution. @work is guaranteed to be idle
3347 * on return if it hasn't been requeued since flush started.
3348 *
3349 * Return:
3350 * %true if flush_work() waited for the work to finish execution,
3351 * %false if it was already idle.
3352 */
3353bool flush_work(struct work_struct *work)
3354{
3355 return __flush_work(work, false);
3356}
3357EXPORT_SYMBOL_GPL(flush_work);
3358
3359struct cwt_wait {
3360 wait_queue_entry_t wait;
3361 struct work_struct *work;
3362};
3363
3364static int cwt_wakefn(wait_queue_entry_t *wait, unsigned mode, int sync, void *key)
3365{
3366 struct cwt_wait *cwait = container_of(wait, struct cwt_wait, wait);
3367
3368 if (cwait->work != key)
3369 return 0;
3370 return autoremove_wake_function(wait, mode, sync, key);
3371}
3372
3373static bool __cancel_work_timer(struct work_struct *work, bool is_dwork)
3374{
3375 static DECLARE_WAIT_QUEUE_HEAD(cancel_waitq);
3376 unsigned long flags;
3377 int ret;
3378
3379 do {
3380 ret = try_to_grab_pending(work, is_dwork, &flags);
3381 /*
3382 * If someone else is already canceling, wait for it to
3383 * finish. flush_work() doesn't work for PREEMPT_NONE
3384 * because we may get scheduled between @work's completion
3385 * and the other canceling task resuming and clearing
3386 * CANCELING - flush_work() will return false immediately
3387 * as @work is no longer busy, try_to_grab_pending() will
3388 * return -ENOENT as @work is still being canceled and the
3389 * other canceling task won't be able to clear CANCELING as
3390 * we're hogging the CPU.
3391 *
3392 * Let's wait for completion using a waitqueue. As this
3393 * may lead to the thundering herd problem, use a custom
3394 * wake function which matches @work along with exclusive
3395 * wait and wakeup.
3396 */
3397 if (unlikely(ret == -ENOENT)) {
3398 struct cwt_wait cwait;
3399
3400 init_wait(&cwait.wait);
3401 cwait.wait.func = cwt_wakefn;
3402 cwait.work = work;
3403
3404 prepare_to_wait_exclusive(&cancel_waitq, &cwait.wait,
3405 TASK_UNINTERRUPTIBLE);
3406 if (work_is_canceling(work))
3407 schedule();
3408 finish_wait(&cancel_waitq, &cwait.wait);
3409 }
3410 } while (unlikely(ret < 0));
3411
3412 /* tell other tasks trying to grab @work to back off */
3413 mark_work_canceling(work);
3414 local_irq_restore(flags);
3415
3416 /*
3417 * This allows canceling during early boot. We know that @work
3418 * isn't executing.
3419 */
3420 if (wq_online)
3421 __flush_work(work, true);
3422
3423 clear_work_data(work);
3424
3425 /*
3426 * Paired with prepare_to_wait() above so that either
3427 * waitqueue_active() is visible here or !work_is_canceling() is
3428 * visible there.
3429 */
3430 smp_mb();
3431 if (waitqueue_active(&cancel_waitq))
3432 __wake_up(&cancel_waitq, TASK_NORMAL, 1, work);
3433
3434 return ret;
3435}
3436
3437/**
3438 * cancel_work_sync - cancel a work and wait for it to finish
3439 * @work: the work to cancel
3440 *
3441 * Cancel @work and wait for its execution to finish. This function
3442 * can be used even if the work re-queues itself or migrates to
3443 * another workqueue. On return from this function, @work is
3444 * guaranteed to be not pending or executing on any CPU.
3445 *
3446 * cancel_work_sync(&delayed_work->work) must not be used for
3447 * delayed_work's. Use cancel_delayed_work_sync() instead.
3448 *
3449 * The caller must ensure that the workqueue on which @work was last
3450 * queued can't be destroyed before this function returns.
3451 *
3452 * Return:
3453 * %true if @work was pending, %false otherwise.
3454 */
3455bool cancel_work_sync(struct work_struct *work)
3456{
3457 return __cancel_work_timer(work, false);
3458}
3459EXPORT_SYMBOL_GPL(cancel_work_sync);
3460
3461/**
3462 * flush_delayed_work - wait for a dwork to finish executing the last queueing
3463 * @dwork: the delayed work to flush
3464 *
3465 * Delayed timer is cancelled and the pending work is queued for
3466 * immediate execution. Like flush_work(), this function only
3467 * considers the last queueing instance of @dwork.
3468 *
3469 * Return:
3470 * %true if flush_work() waited for the work to finish execution,
3471 * %false if it was already idle.
3472 */
3473bool flush_delayed_work(struct delayed_work *dwork)
3474{
3475 local_irq_disable();
3476 if (del_timer_sync(&dwork->timer))
3477 __queue_work(dwork->cpu, dwork->wq, &dwork->work);
3478 local_irq_enable();
3479 return flush_work(&dwork->work);
3480}
3481EXPORT_SYMBOL(flush_delayed_work);
3482
3483/**
3484 * flush_rcu_work - wait for a rwork to finish executing the last queueing
3485 * @rwork: the rcu work to flush
3486 *
3487 * Return:
3488 * %true if flush_rcu_work() waited for the work to finish execution,
3489 * %false if it was already idle.
3490 */
3491bool flush_rcu_work(struct rcu_work *rwork)
3492{
3493 if (test_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&rwork->work))) {
3494 rcu_barrier();
3495 flush_work(&rwork->work);
3496 return true;
3497 } else {
3498 return flush_work(&rwork->work);
3499 }
3500}
3501EXPORT_SYMBOL(flush_rcu_work);
3502
3503static bool __cancel_work(struct work_struct *work, bool is_dwork)
3504{
3505 unsigned long flags;
3506 int ret;
3507
3508 do {
3509 ret = try_to_grab_pending(work, is_dwork, &flags);
3510 } while (unlikely(ret == -EAGAIN));
3511
3512 if (unlikely(ret < 0))
3513 return false;
3514
3515 set_work_pool_and_clear_pending(work, get_work_pool_id(work));
3516 local_irq_restore(flags);
3517 return ret;
3518}
3519
3520/*
3521 * See cancel_delayed_work()
3522 */
3523bool cancel_work(struct work_struct *work)
3524{
3525 return __cancel_work(work, false);
3526}
3527EXPORT_SYMBOL(cancel_work);
3528
3529/**
3530 * cancel_delayed_work - cancel a delayed work
3531 * @dwork: delayed_work to cancel
3532 *
3533 * Kill off a pending delayed_work.
3534 *
3535 * Return: %true if @dwork was pending and canceled; %false if it wasn't
3536 * pending.
3537 *
3538 * Note:
3539 * The work callback function may still be running on return, unless
3540 * it returns %true and the work doesn't re-arm itself. Explicitly flush or
3541 * use cancel_delayed_work_sync() to wait on it.
3542 *
3543 * This function is safe to call from any context including IRQ handler.
3544 */
3545bool cancel_delayed_work(struct delayed_work *dwork)
3546{
3547 return __cancel_work(&dwork->work, true);
3548}
3549EXPORT_SYMBOL(cancel_delayed_work);
3550
3551/**
3552 * cancel_delayed_work_sync - cancel a delayed work and wait for it to finish
3553 * @dwork: the delayed work cancel
3554 *
3555 * This is cancel_work_sync() for delayed works.
3556 *
3557 * Return:
3558 * %true if @dwork was pending, %false otherwise.
3559 */
3560bool cancel_delayed_work_sync(struct delayed_work *dwork)
3561{
3562 return __cancel_work_timer(&dwork->work, true);
3563}
3564EXPORT_SYMBOL(cancel_delayed_work_sync);
3565
3566/**
3567 * schedule_on_each_cpu - execute a function synchronously on each online CPU
3568 * @func: the function to call
3569 *
3570 * schedule_on_each_cpu() executes @func on each online CPU using the
3571 * system workqueue and blocks until all CPUs have completed.
3572 * schedule_on_each_cpu() is very slow.
3573 *
3574 * Return:
3575 * 0 on success, -errno on failure.
3576 */
3577int schedule_on_each_cpu(work_func_t func)
3578{
3579 int cpu;
3580 struct work_struct __percpu *works;
3581
3582 works = alloc_percpu(struct work_struct);
3583 if (!works)
3584 return -ENOMEM;
3585
3586 cpus_read_lock();
3587
3588 for_each_online_cpu(cpu) {
3589 struct work_struct *work = per_cpu_ptr(works, cpu);
3590
3591 INIT_WORK(work, func);
3592 schedule_work_on(cpu, work);
3593 }
3594
3595 for_each_online_cpu(cpu)
3596 flush_work(per_cpu_ptr(works, cpu));
3597
3598 cpus_read_unlock();
3599 free_percpu(works);
3600 return 0;
3601}
3602
3603/**
3604 * execute_in_process_context - reliably execute the routine with user context
3605 * @fn: the function to execute
3606 * @ew: guaranteed storage for the execute work structure (must
3607 * be available when the work executes)
3608 *
3609 * Executes the function immediately if process context is available,
3610 * otherwise schedules the function for delayed execution.
3611 *
3612 * Return: 0 - function was executed
3613 * 1 - function was scheduled for execution
3614 */
3615int execute_in_process_context(work_func_t fn, struct execute_work *ew)
3616{
3617 if (!in_interrupt()) {
3618 fn(&ew->work);
3619 return 0;
3620 }
3621
3622 INIT_WORK(&ew->work, fn);
3623 schedule_work(&ew->work);
3624
3625 return 1;
3626}
3627EXPORT_SYMBOL_GPL(execute_in_process_context);
3628
3629/**
3630 * free_workqueue_attrs - free a workqueue_attrs
3631 * @attrs: workqueue_attrs to free
3632 *
3633 * Undo alloc_workqueue_attrs().
3634 */
3635void free_workqueue_attrs(struct workqueue_attrs *attrs)
3636{
3637 if (attrs) {
3638 free_cpumask_var(attrs->cpumask);
3639 kfree(attrs);
3640 }
3641}
3642
3643/**
3644 * alloc_workqueue_attrs - allocate a workqueue_attrs
3645 *
3646 * Allocate a new workqueue_attrs, initialize with default settings and
3647 * return it.
3648 *
3649 * Return: The allocated new workqueue_attr on success. %NULL on failure.
3650 */
3651struct workqueue_attrs *alloc_workqueue_attrs(void)
3652{
3653 struct workqueue_attrs *attrs;
3654
3655 attrs = kzalloc(sizeof(*attrs), GFP_KERNEL);
3656 if (!attrs)
3657 goto fail;
3658 if (!alloc_cpumask_var(&attrs->cpumask, GFP_KERNEL))
3659 goto fail;
3660
3661 cpumask_copy(attrs->cpumask, cpu_possible_mask);
3662 return attrs;
3663fail:
3664 free_workqueue_attrs(attrs);
3665 return NULL;
3666}
3667
3668static void copy_workqueue_attrs(struct workqueue_attrs *to,
3669 const struct workqueue_attrs *from)
3670{
3671 to->nice = from->nice;
3672 cpumask_copy(to->cpumask, from->cpumask);
3673 /*
3674 * Unlike hash and equality test, this function doesn't ignore
3675 * ->no_numa as it is used for both pool and wq attrs. Instead,
3676 * get_unbound_pool() explicitly clears ->no_numa after copying.
3677 */
3678 to->no_numa = from->no_numa;
3679}
3680
3681/* hash value of the content of @attr */
3682static u32 wqattrs_hash(const struct workqueue_attrs *attrs)
3683{
3684 u32 hash = 0;
3685
3686 hash = jhash_1word(attrs->nice, hash);
3687 hash = jhash(cpumask_bits(attrs->cpumask),
3688 BITS_TO_LONGS(nr_cpumask_bits) * sizeof(long), hash);
3689 return hash;
3690}
3691
3692/* content equality test */
3693static bool wqattrs_equal(const struct workqueue_attrs *a,
3694 const struct workqueue_attrs *b)
3695{
3696 if (a->nice != b->nice)
3697 return false;
3698 if (!cpumask_equal(a->cpumask, b->cpumask))
3699 return false;
3700 return true;
3701}
3702
3703/**
3704 * init_worker_pool - initialize a newly zalloc'd worker_pool
3705 * @pool: worker_pool to initialize
3706 *
3707 * Initialize a newly zalloc'd @pool. It also allocates @pool->attrs.
3708 *
3709 * Return: 0 on success, -errno on failure. Even on failure, all fields
3710 * inside @pool proper are initialized and put_unbound_pool() can be called
3711 * on @pool safely to release it.
3712 */
3713static int init_worker_pool(struct worker_pool *pool)
3714{
3715 raw_spin_lock_init(&pool->lock);
3716 pool->id = -1;
3717 pool->cpu = -1;
3718 pool->node = NUMA_NO_NODE;
3719 pool->flags |= POOL_DISASSOCIATED;
3720 pool->watchdog_ts = jiffies;
3721 INIT_LIST_HEAD(&pool->worklist);
3722 INIT_LIST_HEAD(&pool->idle_list);
3723 hash_init(pool->busy_hash);
3724
3725 timer_setup(&pool->idle_timer, idle_worker_timeout, TIMER_DEFERRABLE);
3726 INIT_WORK(&pool->idle_cull_work, idle_cull_fn);
3727
3728 timer_setup(&pool->mayday_timer, pool_mayday_timeout, 0);
3729
3730 INIT_LIST_HEAD(&pool->workers);
3731 INIT_LIST_HEAD(&pool->dying_workers);
3732
3733 ida_init(&pool->worker_ida);
3734 INIT_HLIST_NODE(&pool->hash_node);
3735 pool->refcnt = 1;
3736
3737 /* shouldn't fail above this point */
3738 pool->attrs = alloc_workqueue_attrs();
3739 if (!pool->attrs)
3740 return -ENOMEM;
3741 return 0;
3742}
3743
3744#ifdef CONFIG_LOCKDEP
3745static void wq_init_lockdep(struct workqueue_struct *wq)
3746{
3747 char *lock_name;
3748
3749 lockdep_register_key(&wq->key);
3750 lock_name = kasprintf(GFP_KERNEL, "%s%s", "(wq_completion)", wq->name);
3751 if (!lock_name)
3752 lock_name = wq->name;
3753
3754 wq->lock_name = lock_name;
3755 lockdep_init_map(&wq->lockdep_map, lock_name, &wq->key, 0);
3756}
3757
3758static void wq_unregister_lockdep(struct workqueue_struct *wq)
3759{
3760 lockdep_unregister_key(&wq->key);
3761}
3762
3763static void wq_free_lockdep(struct workqueue_struct *wq)
3764{
3765 if (wq->lock_name != wq->name)
3766 kfree(wq->lock_name);
3767}
3768#else
3769static void wq_init_lockdep(struct workqueue_struct *wq)
3770{
3771}
3772
3773static void wq_unregister_lockdep(struct workqueue_struct *wq)
3774{
3775}
3776
3777static void wq_free_lockdep(struct workqueue_struct *wq)
3778{
3779}
3780#endif
3781
3782static void rcu_free_wq(struct rcu_head *rcu)
3783{
3784 struct workqueue_struct *wq =
3785 container_of(rcu, struct workqueue_struct, rcu);
3786
3787 wq_free_lockdep(wq);
3788 free_percpu(wq->cpu_pwq);
3789 free_workqueue_attrs(wq->unbound_attrs);
3790 kfree(wq);
3791}
3792
3793static void rcu_free_pool(struct rcu_head *rcu)
3794{
3795 struct worker_pool *pool = container_of(rcu, struct worker_pool, rcu);
3796
3797 ida_destroy(&pool->worker_ida);
3798 free_workqueue_attrs(pool->attrs);
3799 kfree(pool);
3800}
3801
3802/**
3803 * put_unbound_pool - put a worker_pool
3804 * @pool: worker_pool to put
3805 *
3806 * Put @pool. If its refcnt reaches zero, it gets destroyed in RCU
3807 * safe manner. get_unbound_pool() calls this function on its failure path
3808 * and this function should be able to release pools which went through,
3809 * successfully or not, init_worker_pool().
3810 *
3811 * Should be called with wq_pool_mutex held.
3812 */
3813static void put_unbound_pool(struct worker_pool *pool)
3814{
3815 DECLARE_COMPLETION_ONSTACK(detach_completion);
3816 struct worker *worker;
3817 LIST_HEAD(cull_list);
3818
3819 lockdep_assert_held(&wq_pool_mutex);
3820
3821 if (--pool->refcnt)
3822 return;
3823
3824 /* sanity checks */
3825 if (WARN_ON(!(pool->cpu < 0)) ||
3826 WARN_ON(!list_empty(&pool->worklist)))
3827 return;
3828
3829 /* release id and unhash */
3830 if (pool->id >= 0)
3831 idr_remove(&worker_pool_idr, pool->id);
3832 hash_del(&pool->hash_node);
3833
3834 /*
3835 * Become the manager and destroy all workers. This prevents
3836 * @pool's workers from blocking on attach_mutex. We're the last
3837 * manager and @pool gets freed with the flag set.
3838 *
3839 * Having a concurrent manager is quite unlikely to happen as we can
3840 * only get here with
3841 * pwq->refcnt == pool->refcnt == 0
3842 * which implies no work queued to the pool, which implies no worker can
3843 * become the manager. However a worker could have taken the role of
3844 * manager before the refcnts dropped to 0, since maybe_create_worker()
3845 * drops pool->lock
3846 */
3847 while (true) {
3848 rcuwait_wait_event(&manager_wait,
3849 !(pool->flags & POOL_MANAGER_ACTIVE),
3850 TASK_UNINTERRUPTIBLE);
3851
3852 mutex_lock(&wq_pool_attach_mutex);
3853 raw_spin_lock_irq(&pool->lock);
3854 if (!(pool->flags & POOL_MANAGER_ACTIVE)) {
3855 pool->flags |= POOL_MANAGER_ACTIVE;
3856 break;
3857 }
3858 raw_spin_unlock_irq(&pool->lock);
3859 mutex_unlock(&wq_pool_attach_mutex);
3860 }
3861
3862 while ((worker = first_idle_worker(pool)))
3863 set_worker_dying(worker, &cull_list);
3864 WARN_ON(pool->nr_workers || pool->nr_idle);
3865 raw_spin_unlock_irq(&pool->lock);
3866
3867 wake_dying_workers(&cull_list);
3868
3869 if (!list_empty(&pool->workers) || !list_empty(&pool->dying_workers))
3870 pool->detach_completion = &detach_completion;
3871 mutex_unlock(&wq_pool_attach_mutex);
3872
3873 if (pool->detach_completion)
3874 wait_for_completion(pool->detach_completion);
3875
3876 /* shut down the timers */
3877 del_timer_sync(&pool->idle_timer);
3878 cancel_work_sync(&pool->idle_cull_work);
3879 del_timer_sync(&pool->mayday_timer);
3880
3881 /* RCU protected to allow dereferences from get_work_pool() */
3882 call_rcu(&pool->rcu, rcu_free_pool);
3883}
3884
3885/**
3886 * get_unbound_pool - get a worker_pool with the specified attributes
3887 * @attrs: the attributes of the worker_pool to get
3888 *
3889 * Obtain a worker_pool which has the same attributes as @attrs, bump the
3890 * reference count and return it. If there already is a matching
3891 * worker_pool, it will be used; otherwise, this function attempts to
3892 * create a new one.
3893 *
3894 * Should be called with wq_pool_mutex held.
3895 *
3896 * Return: On success, a worker_pool with the same attributes as @attrs.
3897 * On failure, %NULL.
3898 */
3899static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
3900{
3901 u32 hash = wqattrs_hash(attrs);
3902 struct worker_pool *pool;
3903 int node;
3904 int target_node = NUMA_NO_NODE;
3905
3906 lockdep_assert_held(&wq_pool_mutex);
3907
3908 /* do we already have a matching pool? */
3909 hash_for_each_possible(unbound_pool_hash, pool, hash_node, hash) {
3910 if (wqattrs_equal(pool->attrs, attrs)) {
3911 pool->refcnt++;
3912 return pool;
3913 }
3914 }
3915
3916 /* if cpumask is contained inside a NUMA node, we belong to that node */
3917 if (wq_numa_enabled) {
3918 for_each_node(node) {
3919 if (cpumask_subset(attrs->cpumask,
3920 wq_numa_possible_cpumask[node])) {
3921 target_node = node;
3922 break;
3923 }
3924 }
3925 }
3926
3927 /* nope, create a new one */
3928 pool = kzalloc_node(sizeof(*pool), GFP_KERNEL, target_node);
3929 if (!pool || init_worker_pool(pool) < 0)
3930 goto fail;
3931
3932 copy_workqueue_attrs(pool->attrs, attrs);
3933 pool->node = target_node;
3934
3935 /*
3936 * no_numa isn't a worker_pool attribute, always clear it. See
3937 * 'struct workqueue_attrs' comments for detail.
3938 */
3939 pool->attrs->no_numa = false;
3940
3941 if (worker_pool_assign_id(pool) < 0)
3942 goto fail;
3943
3944 /* create and start the initial worker */
3945 if (wq_online && !create_worker(pool))
3946 goto fail;
3947
3948 /* install */
3949 hash_add(unbound_pool_hash, &pool->hash_node, hash);
3950
3951 return pool;
3952fail:
3953 if (pool)
3954 put_unbound_pool(pool);
3955 return NULL;
3956}
3957
3958static void rcu_free_pwq(struct rcu_head *rcu)
3959{
3960 kmem_cache_free(pwq_cache,
3961 container_of(rcu, struct pool_workqueue, rcu));
3962}
3963
3964/*
3965 * Scheduled on pwq_release_worker by put_pwq() when an unbound pwq hits zero
3966 * refcnt and needs to be destroyed.
3967 */
3968static void pwq_release_workfn(struct kthread_work *work)
3969{
3970 struct pool_workqueue *pwq = container_of(work, struct pool_workqueue,
3971 release_work);
3972 struct workqueue_struct *wq = pwq->wq;
3973 struct worker_pool *pool = pwq->pool;
3974 bool is_last = false;
3975
3976 /*
3977 * When @pwq is not linked, it doesn't hold any reference to the
3978 * @wq, and @wq is invalid to access.
3979 */
3980 if (!list_empty(&pwq->pwqs_node)) {
3981 mutex_lock(&wq->mutex);
3982 list_del_rcu(&pwq->pwqs_node);
3983 is_last = list_empty(&wq->pwqs);
3984 mutex_unlock(&wq->mutex);
3985 }
3986
3987 if (wq->flags & WQ_UNBOUND) {
3988 mutex_lock(&wq_pool_mutex);
3989 put_unbound_pool(pool);
3990 mutex_unlock(&wq_pool_mutex);
3991 }
3992
3993 call_rcu(&pwq->rcu, rcu_free_pwq);
3994
3995 /*
3996 * If we're the last pwq going away, @wq is already dead and no one
3997 * is gonna access it anymore. Schedule RCU free.
3998 */
3999 if (is_last) {
4000 wq_unregister_lockdep(wq);
4001 call_rcu(&wq->rcu, rcu_free_wq);
4002 }
4003}
4004
4005/**
4006 * pwq_adjust_max_active - update a pwq's max_active to the current setting
4007 * @pwq: target pool_workqueue
4008 *
4009 * If @pwq isn't freezing, set @pwq->max_active to the associated
4010 * workqueue's saved_max_active and activate inactive work items
4011 * accordingly. If @pwq is freezing, clear @pwq->max_active to zero.
4012 */
4013static void pwq_adjust_max_active(struct pool_workqueue *pwq)
4014{
4015 struct workqueue_struct *wq = pwq->wq;
4016 bool freezable = wq->flags & WQ_FREEZABLE;
4017 unsigned long flags;
4018
4019 /* for @wq->saved_max_active */
4020 lockdep_assert_held(&wq->mutex);
4021
4022 /* fast exit for non-freezable wqs */
4023 if (!freezable && pwq->max_active == wq->saved_max_active)
4024 return;
4025
4026 /* this function can be called during early boot w/ irq disabled */
4027 raw_spin_lock_irqsave(&pwq->pool->lock, flags);
4028
4029 /*
4030 * During [un]freezing, the caller is responsible for ensuring that
4031 * this function is called at least once after @workqueue_freezing
4032 * is updated and visible.
4033 */
4034 if (!freezable || !workqueue_freezing) {
4035 bool kick = false;
4036
4037 pwq->max_active = wq->saved_max_active;
4038
4039 while (!list_empty(&pwq->inactive_works) &&
4040 pwq->nr_active < pwq->max_active) {
4041 pwq_activate_first_inactive(pwq);
4042 kick = true;
4043 }
4044
4045 /*
4046 * Need to kick a worker after thawed or an unbound wq's
4047 * max_active is bumped. In realtime scenarios, always kicking a
4048 * worker will cause interference on the isolated cpu cores, so
4049 * let's kick iff work items were activated.
4050 */
4051 if (kick)
4052 wake_up_worker(pwq->pool);
4053 } else {
4054 pwq->max_active = 0;
4055 }
4056
4057 raw_spin_unlock_irqrestore(&pwq->pool->lock, flags);
4058}
4059
4060/* initialize newly allocated @pwq which is associated with @wq and @pool */
4061static void init_pwq(struct pool_workqueue *pwq, struct workqueue_struct *wq,
4062 struct worker_pool *pool)
4063{
4064 BUG_ON((unsigned long)pwq & WORK_STRUCT_FLAG_MASK);
4065
4066 memset(pwq, 0, sizeof(*pwq));
4067
4068 pwq->pool = pool;
4069 pwq->wq = wq;
4070 pwq->flush_color = -1;
4071 pwq->refcnt = 1;
4072 INIT_LIST_HEAD(&pwq->inactive_works);
4073 INIT_LIST_HEAD(&pwq->pwqs_node);
4074 INIT_LIST_HEAD(&pwq->mayday_node);
4075 kthread_init_work(&pwq->release_work, pwq_release_workfn);
4076}
4077
4078/* sync @pwq with the current state of its associated wq and link it */
4079static void link_pwq(struct pool_workqueue *pwq)
4080{
4081 struct workqueue_struct *wq = pwq->wq;
4082
4083 lockdep_assert_held(&wq->mutex);
4084
4085 /* may be called multiple times, ignore if already linked */
4086 if (!list_empty(&pwq->pwqs_node))
4087 return;
4088
4089 /* set the matching work_color */
4090 pwq->work_color = wq->work_color;
4091
4092 /* sync max_active to the current setting */
4093 pwq_adjust_max_active(pwq);
4094
4095 /* link in @pwq */
4096 list_add_rcu(&pwq->pwqs_node, &wq->pwqs);
4097}
4098
4099/* obtain a pool matching @attr and create a pwq associating the pool and @wq */
4100static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
4101 const struct workqueue_attrs *attrs)
4102{
4103 struct worker_pool *pool;
4104 struct pool_workqueue *pwq;
4105
4106 lockdep_assert_held(&wq_pool_mutex);
4107
4108 pool = get_unbound_pool(attrs);
4109 if (!pool)
4110 return NULL;
4111
4112 pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node);
4113 if (!pwq) {
4114 put_unbound_pool(pool);
4115 return NULL;
4116 }
4117
4118 init_pwq(pwq, wq, pool);
4119 return pwq;
4120}
4121
4122/**
4123 * wq_calc_node_cpumask - calculate a wq_attrs' cpumask for the specified node
4124 * @attrs: the wq_attrs of the default pwq of the target workqueue
4125 * @node: the target NUMA node
4126 * @cpu_going_down: if >= 0, the CPU to consider as offline
4127 * @cpumask: outarg, the resulting cpumask
4128 *
4129 * Calculate the cpumask a workqueue with @attrs should use on @node. If
4130 * @cpu_going_down is >= 0, that cpu is considered offline during
4131 * calculation. The result is stored in @cpumask.
4132 *
4133 * If NUMA affinity is not enabled, @attrs->cpumask is always used. If
4134 * enabled and @node has online CPUs requested by @attrs, the returned
4135 * cpumask is the intersection of the possible CPUs of @node and
4136 * @attrs->cpumask.
4137 *
4138 * The caller is responsible for ensuring that the cpumask of @node stays
4139 * stable.
4140 */
4141static void wq_calc_node_cpumask(const struct workqueue_attrs *attrs, int node,
4142 int cpu_going_down, cpumask_t *cpumask)
4143{
4144 if (!wq_numa_enabled || attrs->no_numa)
4145 goto use_dfl;
4146
4147 /* does @node have any online CPUs @attrs wants? */
4148 cpumask_and(cpumask, cpumask_of_node(node), attrs->cpumask);
4149 if (cpu_going_down >= 0)
4150 cpumask_clear_cpu(cpu_going_down, cpumask);
4151
4152 if (cpumask_empty(cpumask))
4153 goto use_dfl;
4154
4155 /* yeap, return possible CPUs in @node that @attrs wants */
4156 cpumask_and(cpumask, attrs->cpumask, wq_numa_possible_cpumask[node]);
4157
4158 if (cpumask_empty(cpumask))
4159 pr_warn_once("WARNING: workqueue cpumask: online intersect > "
4160 "possible intersect\n");
4161 return;
4162
4163use_dfl:
4164 cpumask_copy(cpumask, attrs->cpumask);
4165}
4166
4167/* install @pwq into @wq's cpu_pwq and return the old pwq */
4168static struct pool_workqueue *install_unbound_pwq(struct workqueue_struct *wq,
4169 int cpu, struct pool_workqueue *pwq)
4170{
4171 struct pool_workqueue *old_pwq;
4172
4173 lockdep_assert_held(&wq_pool_mutex);
4174 lockdep_assert_held(&wq->mutex);
4175
4176 /* link_pwq() can handle duplicate calls */
4177 link_pwq(pwq);
4178
4179 old_pwq = rcu_access_pointer(*per_cpu_ptr(wq->cpu_pwq, cpu));
4180 rcu_assign_pointer(*per_cpu_ptr(wq->cpu_pwq, cpu), pwq);
4181 return old_pwq;
4182}
4183
4184/* context to store the prepared attrs & pwqs before applying */
4185struct apply_wqattrs_ctx {
4186 struct workqueue_struct *wq; /* target workqueue */
4187 struct workqueue_attrs *attrs; /* attrs to apply */
4188 struct list_head list; /* queued for batching commit */
4189 struct pool_workqueue *dfl_pwq;
4190 struct pool_workqueue *pwq_tbl[];
4191};
4192
4193/* free the resources after success or abort */
4194static void apply_wqattrs_cleanup(struct apply_wqattrs_ctx *ctx)
4195{
4196 if (ctx) {
4197 int cpu;
4198
4199 for_each_possible_cpu(cpu)
4200 put_pwq_unlocked(ctx->pwq_tbl[cpu]);
4201 put_pwq_unlocked(ctx->dfl_pwq);
4202
4203 free_workqueue_attrs(ctx->attrs);
4204
4205 kfree(ctx);
4206 }
4207}
4208
4209/* allocate the attrs and pwqs for later installation */
4210static struct apply_wqattrs_ctx *
4211apply_wqattrs_prepare(struct workqueue_struct *wq,
4212 const struct workqueue_attrs *attrs,
4213 const cpumask_var_t unbound_cpumask)
4214{
4215 struct apply_wqattrs_ctx *ctx;
4216 struct workqueue_attrs *new_attrs, *tmp_attrs;
4217 int cpu;
4218
4219 lockdep_assert_held(&wq_pool_mutex);
4220
4221 ctx = kzalloc(struct_size(ctx, pwq_tbl, nr_cpu_ids), GFP_KERNEL);
4222
4223 new_attrs = alloc_workqueue_attrs();
4224 tmp_attrs = alloc_workqueue_attrs();
4225 if (!ctx || !new_attrs || !tmp_attrs)
4226 goto out_free;
4227
4228 /*
4229 * Calculate the attrs of the default pwq with unbound_cpumask
4230 * which is wq_unbound_cpumask or to set to wq_unbound_cpumask.
4231 * If the user configured cpumask doesn't overlap with the
4232 * wq_unbound_cpumask, we fallback to the wq_unbound_cpumask.
4233 */
4234 copy_workqueue_attrs(new_attrs, attrs);
4235 cpumask_and(new_attrs->cpumask, new_attrs->cpumask, unbound_cpumask);
4236 if (unlikely(cpumask_empty(new_attrs->cpumask)))
4237 cpumask_copy(new_attrs->cpumask, unbound_cpumask);
4238
4239 /*
4240 * We may create multiple pwqs with differing cpumasks. Make a
4241 * copy of @new_attrs which will be modified and used to obtain
4242 * pools.
4243 */
4244 copy_workqueue_attrs(tmp_attrs, new_attrs);
4245
4246 /*
4247 * If something goes wrong during CPU up/down, we'll fall back to
4248 * the default pwq covering whole @attrs->cpumask. Always create
4249 * it even if we don't use it immediately.
4250 */
4251 ctx->dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
4252 if (!ctx->dfl_pwq)
4253 goto out_free;
4254
4255 for_each_possible_cpu(cpu) {
4256 if (new_attrs->no_numa) {
4257 ctx->dfl_pwq->refcnt++;
4258 ctx->pwq_tbl[cpu] = ctx->dfl_pwq;
4259 } else {
4260 wq_calc_node_cpumask(new_attrs, cpu_to_node(cpu), -1,
4261 tmp_attrs->cpumask);
4262 ctx->pwq_tbl[cpu] = alloc_unbound_pwq(wq, tmp_attrs);
4263 if (!ctx->pwq_tbl[cpu])
4264 goto out_free;
4265 }
4266 }
4267
4268 /* save the user configured attrs and sanitize it. */
4269 copy_workqueue_attrs(new_attrs, attrs);
4270 cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);
4271 ctx->attrs = new_attrs;
4272
4273 ctx->wq = wq;
4274 free_workqueue_attrs(tmp_attrs);
4275 return ctx;
4276
4277out_free:
4278 free_workqueue_attrs(tmp_attrs);
4279 free_workqueue_attrs(new_attrs);
4280 apply_wqattrs_cleanup(ctx);
4281 return NULL;
4282}
4283
4284/* set attrs and install prepared pwqs, @ctx points to old pwqs on return */
4285static void apply_wqattrs_commit(struct apply_wqattrs_ctx *ctx)
4286{
4287 int cpu;
4288
4289 /* all pwqs have been created successfully, let's install'em */
4290 mutex_lock(&ctx->wq->mutex);
4291
4292 copy_workqueue_attrs(ctx->wq->unbound_attrs, ctx->attrs);
4293
4294 /* save the previous pwq and install the new one */
4295 for_each_possible_cpu(cpu)
4296 ctx->pwq_tbl[cpu] = install_unbound_pwq(ctx->wq, cpu,
4297 ctx->pwq_tbl[cpu]);
4298
4299 /* @dfl_pwq might not have been used, ensure it's linked */
4300 link_pwq(ctx->dfl_pwq);
4301 swap(ctx->wq->dfl_pwq, ctx->dfl_pwq);
4302
4303 mutex_unlock(&ctx->wq->mutex);
4304}
4305
4306static void apply_wqattrs_lock(void)
4307{
4308 /* CPUs should stay stable across pwq creations and installations */
4309 cpus_read_lock();
4310 mutex_lock(&wq_pool_mutex);
4311}
4312
4313static void apply_wqattrs_unlock(void)
4314{
4315 mutex_unlock(&wq_pool_mutex);
4316 cpus_read_unlock();
4317}
4318
4319static int apply_workqueue_attrs_locked(struct workqueue_struct *wq,
4320 const struct workqueue_attrs *attrs)
4321{
4322 struct apply_wqattrs_ctx *ctx;
4323
4324 /* only unbound workqueues can change attributes */
4325 if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
4326 return -EINVAL;
4327
4328 /* creating multiple pwqs breaks ordering guarantee */
4329 if (!list_empty(&wq->pwqs)) {
4330 if (WARN_ON(wq->flags & __WQ_ORDERED_EXPLICIT))
4331 return -EINVAL;
4332
4333 wq->flags &= ~__WQ_ORDERED;
4334 }
4335
4336 ctx = apply_wqattrs_prepare(wq, attrs, wq_unbound_cpumask);
4337 if (!ctx)
4338 return -ENOMEM;
4339
4340 /* the ctx has been prepared successfully, let's commit it */
4341 apply_wqattrs_commit(ctx);
4342 apply_wqattrs_cleanup(ctx);
4343
4344 return 0;
4345}
4346
4347/**
4348 * apply_workqueue_attrs - apply new workqueue_attrs to an unbound workqueue
4349 * @wq: the target workqueue
4350 * @attrs: the workqueue_attrs to apply, allocated with alloc_workqueue_attrs()
4351 *
4352 * Apply @attrs to an unbound workqueue @wq. Unless disabled, on NUMA
4353 * machines, this function maps a separate pwq to each NUMA node with
4354 * possibles CPUs in @attrs->cpumask so that work items are affine to the
4355 * NUMA node it was issued on. Older pwqs are released as in-flight work
4356 * items finish. Note that a work item which repeatedly requeues itself
4357 * back-to-back will stay on its current pwq.
4358 *
4359 * Performs GFP_KERNEL allocations.
4360 *
4361 * Assumes caller has CPU hotplug read exclusion, i.e. cpus_read_lock().
4362 *
4363 * Return: 0 on success and -errno on failure.
4364 */
4365int apply_workqueue_attrs(struct workqueue_struct *wq,
4366 const struct workqueue_attrs *attrs)
4367{
4368 int ret;
4369
4370 lockdep_assert_cpus_held();
4371
4372 mutex_lock(&wq_pool_mutex);
4373 ret = apply_workqueue_attrs_locked(wq, attrs);
4374 mutex_unlock(&wq_pool_mutex);
4375
4376 return ret;
4377}
4378
4379/**
4380 * wq_update_unbound_numa - update NUMA affinity of a wq for CPU hot[un]plug
4381 * @wq: the target workqueue
4382 * @cpu: the CPU to update pool association for
4383 * @hotplug_cpu: the CPU coming up or going down
4384 * @online: whether @cpu is coming up or going down
4385 *
4386 * This function is to be called from %CPU_DOWN_PREPARE, %CPU_ONLINE and
4387 * %CPU_DOWN_FAILED. @cpu is being hot[un]plugged, update NUMA affinity of
4388 * @wq accordingly.
4389 *
4390 * If NUMA affinity can't be adjusted due to memory allocation failure, it
4391 * falls back to @wq->dfl_pwq which may not be optimal but is always
4392 * correct.
4393 *
4394 * Note that when the last allowed CPU of a NUMA node goes offline for a
4395 * workqueue with a cpumask spanning multiple nodes, the workers which were
4396 * already executing the work items for the workqueue will lose their CPU
4397 * affinity and may execute on any CPU. This is similar to how per-cpu
4398 * workqueues behave on CPU_DOWN. If a workqueue user wants strict
4399 * affinity, it's the user's responsibility to flush the work item from
4400 * CPU_DOWN_PREPARE.
4401 */
4402static void wq_update_unbound_numa(struct workqueue_struct *wq, int cpu,
4403 int hotplug_cpu, bool online)
4404{
4405 int node = cpu_to_node(cpu);
4406 int off_cpu = online ? -1 : hotplug_cpu;
4407 struct pool_workqueue *old_pwq = NULL, *pwq;
4408 struct workqueue_attrs *target_attrs;
4409 cpumask_t *cpumask;
4410
4411 lockdep_assert_held(&wq_pool_mutex);
4412
4413 if (!wq_numa_enabled || !(wq->flags & WQ_UNBOUND) ||
4414 wq->unbound_attrs->no_numa)
4415 return;
4416
4417 /*
4418 * We don't wanna alloc/free wq_attrs for each wq for each CPU.
4419 * Let's use a preallocated one. The following buf is protected by
4420 * CPU hotplug exclusion.
4421 */
4422 target_attrs = wq_update_unbound_numa_attrs_buf;
4423 cpumask = target_attrs->cpumask;
4424
4425 copy_workqueue_attrs(target_attrs, wq->unbound_attrs);
4426
4427 /* nothing to do if the target cpumask matches the current pwq */
4428 wq_calc_node_cpumask(wq->dfl_pwq->pool->attrs, node, off_cpu, cpumask);
4429 pwq = rcu_dereference_protected(*per_cpu_ptr(wq->cpu_pwq, cpu),
4430 lockdep_is_held(&wq_pool_mutex));
4431 if (cpumask_equal(cpumask, pwq->pool->attrs->cpumask))
4432 return;
4433
4434 /* create a new pwq */
4435 pwq = alloc_unbound_pwq(wq, target_attrs);
4436 if (!pwq) {
4437 pr_warn("workqueue: allocation failed while updating NUMA affinity of \"%s\"\n",
4438 wq->name);
4439 goto use_dfl_pwq;
4440 }
4441
4442 /* Install the new pwq. */
4443 mutex_lock(&wq->mutex);
4444 old_pwq = install_unbound_pwq(wq, cpu, pwq);
4445 goto out_unlock;
4446
4447use_dfl_pwq:
4448 mutex_lock(&wq->mutex);
4449 raw_spin_lock_irq(&wq->dfl_pwq->pool->lock);
4450 get_pwq(wq->dfl_pwq);
4451 raw_spin_unlock_irq(&wq->dfl_pwq->pool->lock);
4452 old_pwq = install_unbound_pwq(wq, cpu, wq->dfl_pwq);
4453out_unlock:
4454 mutex_unlock(&wq->mutex);
4455 put_pwq_unlocked(old_pwq);
4456}
4457
4458static int alloc_and_link_pwqs(struct workqueue_struct *wq)
4459{
4460 bool highpri = wq->flags & WQ_HIGHPRI;
4461 int cpu, ret;
4462
4463 wq->cpu_pwq = alloc_percpu(struct pool_workqueue *);
4464 if (!wq->cpu_pwq)
4465 goto enomem;
4466
4467 if (!(wq->flags & WQ_UNBOUND)) {
4468 for_each_possible_cpu(cpu) {
4469 struct pool_workqueue **pwq_p =
4470 per_cpu_ptr(wq->cpu_pwq, cpu);
4471 struct worker_pool *pool =
4472 &(per_cpu_ptr(cpu_worker_pools, cpu)[highpri]);
4473
4474 *pwq_p = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL,
4475 pool->node);
4476 if (!*pwq_p)
4477 goto enomem;
4478
4479 init_pwq(*pwq_p, wq, pool);
4480
4481 mutex_lock(&wq->mutex);
4482 link_pwq(*pwq_p);
4483 mutex_unlock(&wq->mutex);
4484 }
4485 return 0;
4486 }
4487
4488 cpus_read_lock();
4489 if (wq->flags & __WQ_ORDERED) {
4490 ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]);
4491 /* there should only be single pwq for ordering guarantee */
4492 WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node ||
4493 wq->pwqs.prev != &wq->dfl_pwq->pwqs_node),
4494 "ordering guarantee broken for workqueue %s\n", wq->name);
4495 } else {
4496 ret = apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]);
4497 }
4498 cpus_read_unlock();
4499
4500 return ret;
4501
4502enomem:
4503 if (wq->cpu_pwq) {
4504 for_each_possible_cpu(cpu)
4505 kfree(*per_cpu_ptr(wq->cpu_pwq, cpu));
4506 free_percpu(wq->cpu_pwq);
4507 wq->cpu_pwq = NULL;
4508 }
4509 return -ENOMEM;
4510}
4511
4512static int wq_clamp_max_active(int max_active, unsigned int flags,
4513 const char *name)
4514{
4515 if (max_active < 1 || max_active > WQ_MAX_ACTIVE)
4516 pr_warn("workqueue: max_active %d requested for %s is out of range, clamping between %d and %d\n",
4517 max_active, name, 1, WQ_MAX_ACTIVE);
4518
4519 return clamp_val(max_active, 1, WQ_MAX_ACTIVE);
4520}
4521
4522/*
4523 * Workqueues which may be used during memory reclaim should have a rescuer
4524 * to guarantee forward progress.
4525 */
4526static int init_rescuer(struct workqueue_struct *wq)
4527{
4528 struct worker *rescuer;
4529 int ret;
4530
4531 if (!(wq->flags & WQ_MEM_RECLAIM))
4532 return 0;
4533
4534 rescuer = alloc_worker(NUMA_NO_NODE);
4535 if (!rescuer) {
4536 pr_err("workqueue: Failed to allocate a rescuer for wq \"%s\"\n",
4537 wq->name);
4538 return -ENOMEM;
4539 }
4540
4541 rescuer->rescue_wq = wq;
4542 rescuer->task = kthread_create(rescuer_thread, rescuer, "%s", wq->name);
4543 if (IS_ERR(rescuer->task)) {
4544 ret = PTR_ERR(rescuer->task);
4545 pr_err("workqueue: Failed to create a rescuer kthread for wq \"%s\": %pe",
4546 wq->name, ERR_PTR(ret));
4547 kfree(rescuer);
4548 return ret;
4549 }
4550
4551 wq->rescuer = rescuer;
4552 kthread_bind_mask(rescuer->task, cpu_possible_mask);
4553 wake_up_process(rescuer->task);
4554
4555 return 0;
4556}
4557
4558__printf(1, 4)
4559struct workqueue_struct *alloc_workqueue(const char *fmt,
4560 unsigned int flags,
4561 int max_active, ...)
4562{
4563 va_list args;
4564 struct workqueue_struct *wq;
4565 struct pool_workqueue *pwq;
4566
4567 /*
4568 * Unbound && max_active == 1 used to imply ordered, which is no
4569 * longer the case on NUMA machines due to per-node pools. While
4570 * alloc_ordered_workqueue() is the right way to create an ordered
4571 * workqueue, keep the previous behavior to avoid subtle breakages
4572 * on NUMA.
4573 */
4574 if ((flags & WQ_UNBOUND) && max_active == 1)
4575 flags |= __WQ_ORDERED;
4576
4577 /* see the comment above the definition of WQ_POWER_EFFICIENT */
4578 if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient)
4579 flags |= WQ_UNBOUND;
4580
4581 /* allocate wq and format name */
4582 wq = kzalloc(sizeof(*wq), GFP_KERNEL);
4583 if (!wq)
4584 return NULL;
4585
4586 if (flags & WQ_UNBOUND) {
4587 wq->unbound_attrs = alloc_workqueue_attrs();
4588 if (!wq->unbound_attrs)
4589 goto err_free_wq;
4590 }
4591
4592 va_start(args, max_active);
4593 vsnprintf(wq->name, sizeof(wq->name), fmt, args);
4594 va_end(args);
4595
4596 max_active = max_active ?: WQ_DFL_ACTIVE;
4597 max_active = wq_clamp_max_active(max_active, flags, wq->name);
4598
4599 /* init wq */
4600 wq->flags = flags;
4601 wq->saved_max_active = max_active;
4602 mutex_init(&wq->mutex);
4603 atomic_set(&wq->nr_pwqs_to_flush, 0);
4604 INIT_LIST_HEAD(&wq->pwqs);
4605 INIT_LIST_HEAD(&wq->flusher_queue);
4606 INIT_LIST_HEAD(&wq->flusher_overflow);
4607 INIT_LIST_HEAD(&wq->maydays);
4608
4609 wq_init_lockdep(wq);
4610 INIT_LIST_HEAD(&wq->list);
4611
4612 if (alloc_and_link_pwqs(wq) < 0)
4613 goto err_unreg_lockdep;
4614
4615 if (wq_online && init_rescuer(wq) < 0)
4616 goto err_destroy;
4617
4618 if ((wq->flags & WQ_SYSFS) && workqueue_sysfs_register(wq))
4619 goto err_destroy;
4620
4621 /*
4622 * wq_pool_mutex protects global freeze state and workqueues list.
4623 * Grab it, adjust max_active and add the new @wq to workqueues
4624 * list.
4625 */
4626 mutex_lock(&wq_pool_mutex);
4627
4628 mutex_lock(&wq->mutex);
4629 for_each_pwq(pwq, wq)
4630 pwq_adjust_max_active(pwq);
4631 mutex_unlock(&wq->mutex);
4632
4633 list_add_tail_rcu(&wq->list, &workqueues);
4634
4635 mutex_unlock(&wq_pool_mutex);
4636
4637 return wq;
4638
4639err_unreg_lockdep:
4640 wq_unregister_lockdep(wq);
4641 wq_free_lockdep(wq);
4642err_free_wq:
4643 free_workqueue_attrs(wq->unbound_attrs);
4644 kfree(wq);
4645 return NULL;
4646err_destroy:
4647 destroy_workqueue(wq);
4648 return NULL;
4649}
4650EXPORT_SYMBOL_GPL(alloc_workqueue);
4651
4652static bool pwq_busy(struct pool_workqueue *pwq)
4653{
4654 int i;
4655
4656 for (i = 0; i < WORK_NR_COLORS; i++)
4657 if (pwq->nr_in_flight[i])
4658 return true;
4659
4660 if ((pwq != pwq->wq->dfl_pwq) && (pwq->refcnt > 1))
4661 return true;
4662 if (pwq->nr_active || !list_empty(&pwq->inactive_works))
4663 return true;
4664
4665 return false;
4666}
4667
4668/**
4669 * destroy_workqueue - safely terminate a workqueue
4670 * @wq: target workqueue
4671 *
4672 * Safely destroy a workqueue. All work currently pending will be done first.
4673 */
4674void destroy_workqueue(struct workqueue_struct *wq)
4675{
4676 struct pool_workqueue *pwq;
4677 int cpu;
4678
4679 /*
4680 * Remove it from sysfs first so that sanity check failure doesn't
4681 * lead to sysfs name conflicts.
4682 */
4683 workqueue_sysfs_unregister(wq);
4684
4685 /* mark the workqueue destruction is in progress */
4686 mutex_lock(&wq->mutex);
4687 wq->flags |= __WQ_DESTROYING;
4688 mutex_unlock(&wq->mutex);
4689
4690 /* drain it before proceeding with destruction */
4691 drain_workqueue(wq);
4692
4693 /* kill rescuer, if sanity checks fail, leave it w/o rescuer */
4694 if (wq->rescuer) {
4695 struct worker *rescuer = wq->rescuer;
4696
4697 /* this prevents new queueing */
4698 raw_spin_lock_irq(&wq_mayday_lock);
4699 wq->rescuer = NULL;
4700 raw_spin_unlock_irq(&wq_mayday_lock);
4701
4702 /* rescuer will empty maydays list before exiting */
4703 kthread_stop(rescuer->task);
4704 kfree(rescuer);
4705 }
4706
4707 /*
4708 * Sanity checks - grab all the locks so that we wait for all
4709 * in-flight operations which may do put_pwq().
4710 */
4711 mutex_lock(&wq_pool_mutex);
4712 mutex_lock(&wq->mutex);
4713 for_each_pwq(pwq, wq) {
4714 raw_spin_lock_irq(&pwq->pool->lock);
4715 if (WARN_ON(pwq_busy(pwq))) {
4716 pr_warn("%s: %s has the following busy pwq\n",
4717 __func__, wq->name);
4718 show_pwq(pwq);
4719 raw_spin_unlock_irq(&pwq->pool->lock);
4720 mutex_unlock(&wq->mutex);
4721 mutex_unlock(&wq_pool_mutex);
4722 show_one_workqueue(wq);
4723 return;
4724 }
4725 raw_spin_unlock_irq(&pwq->pool->lock);
4726 }
4727 mutex_unlock(&wq->mutex);
4728
4729 /*
4730 * wq list is used to freeze wq, remove from list after
4731 * flushing is complete in case freeze races us.
4732 */
4733 list_del_rcu(&wq->list);
4734 mutex_unlock(&wq_pool_mutex);
4735
4736 /*
4737 * We're the sole accessor of @wq. Directly access cpu_pwq and dfl_pwq
4738 * to put the base refs. @wq will be auto-destroyed from the last
4739 * pwq_put. RCU read lock prevents @wq from going away from under us.
4740 */
4741 rcu_read_lock();
4742
4743 for_each_possible_cpu(cpu) {
4744 pwq = rcu_access_pointer(*per_cpu_ptr(wq->cpu_pwq, cpu));
4745 RCU_INIT_POINTER(*per_cpu_ptr(wq->cpu_pwq, cpu), NULL);
4746 put_pwq_unlocked(pwq);
4747 }
4748
4749 put_pwq_unlocked(wq->dfl_pwq);
4750 wq->dfl_pwq = NULL;
4751
4752 rcu_read_unlock();
4753}
4754EXPORT_SYMBOL_GPL(destroy_workqueue);
4755
4756/**
4757 * workqueue_set_max_active - adjust max_active of a workqueue
4758 * @wq: target workqueue
4759 * @max_active: new max_active value.
4760 *
4761 * Set max_active of @wq to @max_active.
4762 *
4763 * CONTEXT:
4764 * Don't call from IRQ context.
4765 */
4766void workqueue_set_max_active(struct workqueue_struct *wq, int max_active)
4767{
4768 struct pool_workqueue *pwq;
4769
4770 /* disallow meddling with max_active for ordered workqueues */
4771 if (WARN_ON(wq->flags & __WQ_ORDERED_EXPLICIT))
4772 return;
4773
4774 max_active = wq_clamp_max_active(max_active, wq->flags, wq->name);
4775
4776 mutex_lock(&wq->mutex);
4777
4778 wq->flags &= ~__WQ_ORDERED;
4779 wq->saved_max_active = max_active;
4780
4781 for_each_pwq(pwq, wq)
4782 pwq_adjust_max_active(pwq);
4783
4784 mutex_unlock(&wq->mutex);
4785}
4786EXPORT_SYMBOL_GPL(workqueue_set_max_active);
4787
4788/**
4789 * current_work - retrieve %current task's work struct
4790 *
4791 * Determine if %current task is a workqueue worker and what it's working on.
4792 * Useful to find out the context that the %current task is running in.
4793 *
4794 * Return: work struct if %current task is a workqueue worker, %NULL otherwise.
4795 */
4796struct work_struct *current_work(void)
4797{
4798 struct worker *worker = current_wq_worker();
4799
4800 return worker ? worker->current_work : NULL;
4801}
4802EXPORT_SYMBOL(current_work);
4803
4804/**
4805 * current_is_workqueue_rescuer - is %current workqueue rescuer?
4806 *
4807 * Determine whether %current is a workqueue rescuer. Can be used from
4808 * work functions to determine whether it's being run off the rescuer task.
4809 *
4810 * Return: %true if %current is a workqueue rescuer. %false otherwise.
4811 */
4812bool current_is_workqueue_rescuer(void)
4813{
4814 struct worker *worker = current_wq_worker();
4815
4816 return worker && worker->rescue_wq;
4817}
4818
4819/**
4820 * workqueue_congested - test whether a workqueue is congested
4821 * @cpu: CPU in question
4822 * @wq: target workqueue
4823 *
4824 * Test whether @wq's cpu workqueue for @cpu is congested. There is
4825 * no synchronization around this function and the test result is
4826 * unreliable and only useful as advisory hints or for debugging.
4827 *
4828 * If @cpu is WORK_CPU_UNBOUND, the test is performed on the local CPU.
4829 *
4830 * With the exception of ordered workqueues, all workqueues have per-cpu
4831 * pool_workqueues, each with its own congested state. A workqueue being
4832 * congested on one CPU doesn't mean that the workqueue is contested on any
4833 * other CPUs.
4834 *
4835 * Return:
4836 * %true if congested, %false otherwise.
4837 */
4838bool workqueue_congested(int cpu, struct workqueue_struct *wq)
4839{
4840 struct pool_workqueue *pwq;
4841 bool ret;
4842
4843 rcu_read_lock();
4844 preempt_disable();
4845
4846 if (cpu == WORK_CPU_UNBOUND)
4847 cpu = smp_processor_id();
4848
4849 pwq = *per_cpu_ptr(wq->cpu_pwq, cpu);
4850 ret = !list_empty(&pwq->inactive_works);
4851
4852 preempt_enable();
4853 rcu_read_unlock();
4854
4855 return ret;
4856}
4857EXPORT_SYMBOL_GPL(workqueue_congested);
4858
4859/**
4860 * work_busy - test whether a work is currently pending or running
4861 * @work: the work to be tested
4862 *
4863 * Test whether @work is currently pending or running. There is no
4864 * synchronization around this function and the test result is
4865 * unreliable and only useful as advisory hints or for debugging.
4866 *
4867 * Return:
4868 * OR'd bitmask of WORK_BUSY_* bits.
4869 */
4870unsigned int work_busy(struct work_struct *work)
4871{
4872 struct worker_pool *pool;
4873 unsigned long flags;
4874 unsigned int ret = 0;
4875
4876 if (work_pending(work))
4877 ret |= WORK_BUSY_PENDING;
4878
4879 rcu_read_lock();
4880 pool = get_work_pool(work);
4881 if (pool) {
4882 raw_spin_lock_irqsave(&pool->lock, flags);
4883 if (find_worker_executing_work(pool, work))
4884 ret |= WORK_BUSY_RUNNING;
4885 raw_spin_unlock_irqrestore(&pool->lock, flags);
4886 }
4887 rcu_read_unlock();
4888
4889 return ret;
4890}
4891EXPORT_SYMBOL_GPL(work_busy);
4892
4893/**
4894 * set_worker_desc - set description for the current work item
4895 * @fmt: printf-style format string
4896 * @...: arguments for the format string
4897 *
4898 * This function can be called by a running work function to describe what
4899 * the work item is about. If the worker task gets dumped, this
4900 * information will be printed out together to help debugging. The
4901 * description can be at most WORKER_DESC_LEN including the trailing '\0'.
4902 */
4903void set_worker_desc(const char *fmt, ...)
4904{
4905 struct worker *worker = current_wq_worker();
4906 va_list args;
4907
4908 if (worker) {
4909 va_start(args, fmt);
4910 vsnprintf(worker->desc, sizeof(worker->desc), fmt, args);
4911 va_end(args);
4912 }
4913}
4914EXPORT_SYMBOL_GPL(set_worker_desc);
4915
4916/**
4917 * print_worker_info - print out worker information and description
4918 * @log_lvl: the log level to use when printing
4919 * @task: target task
4920 *
4921 * If @task is a worker and currently executing a work item, print out the
4922 * name of the workqueue being serviced and worker description set with
4923 * set_worker_desc() by the currently executing work item.
4924 *
4925 * This function can be safely called on any task as long as the
4926 * task_struct itself is accessible. While safe, this function isn't
4927 * synchronized and may print out mixups or garbages of limited length.
4928 */
4929void print_worker_info(const char *log_lvl, struct task_struct *task)
4930{
4931 work_func_t *fn = NULL;
4932 char name[WQ_NAME_LEN] = { };
4933 char desc[WORKER_DESC_LEN] = { };
4934 struct pool_workqueue *pwq = NULL;
4935 struct workqueue_struct *wq = NULL;
4936 struct worker *worker;
4937
4938 if (!(task->flags & PF_WQ_WORKER))
4939 return;
4940
4941 /*
4942 * This function is called without any synchronization and @task
4943 * could be in any state. Be careful with dereferences.
4944 */
4945 worker = kthread_probe_data(task);
4946
4947 /*
4948 * Carefully copy the associated workqueue's workfn, name and desc.
4949 * Keep the original last '\0' in case the original is garbage.
4950 */
4951 copy_from_kernel_nofault(&fn, &worker->current_func, sizeof(fn));
4952 copy_from_kernel_nofault(&pwq, &worker->current_pwq, sizeof(pwq));
4953 copy_from_kernel_nofault(&wq, &pwq->wq, sizeof(wq));
4954 copy_from_kernel_nofault(name, wq->name, sizeof(name) - 1);
4955 copy_from_kernel_nofault(desc, worker->desc, sizeof(desc) - 1);
4956
4957 if (fn || name[0] || desc[0]) {
4958 printk("%sWorkqueue: %s %ps", log_lvl, name, fn);
4959 if (strcmp(name, desc))
4960 pr_cont(" (%s)", desc);
4961 pr_cont("\n");
4962 }
4963}
4964
4965static void pr_cont_pool_info(struct worker_pool *pool)
4966{
4967 pr_cont(" cpus=%*pbl", nr_cpumask_bits, pool->attrs->cpumask);
4968 if (pool->node != NUMA_NO_NODE)
4969 pr_cont(" node=%d", pool->node);
4970 pr_cont(" flags=0x%x nice=%d", pool->flags, pool->attrs->nice);
4971}
4972
4973struct pr_cont_work_struct {
4974 bool comma;
4975 work_func_t func;
4976 long ctr;
4977};
4978
4979static void pr_cont_work_flush(bool comma, work_func_t func, struct pr_cont_work_struct *pcwsp)
4980{
4981 if (!pcwsp->ctr)
4982 goto out_record;
4983 if (func == pcwsp->func) {
4984 pcwsp->ctr++;
4985 return;
4986 }
4987 if (pcwsp->ctr == 1)
4988 pr_cont("%s %ps", pcwsp->comma ? "," : "", pcwsp->func);
4989 else
4990 pr_cont("%s %ld*%ps", pcwsp->comma ? "," : "", pcwsp->ctr, pcwsp->func);
4991 pcwsp->ctr = 0;
4992out_record:
4993 if ((long)func == -1L)
4994 return;
4995 pcwsp->comma = comma;
4996 pcwsp->func = func;
4997 pcwsp->ctr = 1;
4998}
4999
5000static void pr_cont_work(bool comma, struct work_struct *work, struct pr_cont_work_struct *pcwsp)
5001{
5002 if (work->func == wq_barrier_func) {
5003 struct wq_barrier *barr;
5004
5005 barr = container_of(work, struct wq_barrier, work);
5006
5007 pr_cont_work_flush(comma, (work_func_t)-1, pcwsp);
5008 pr_cont("%s BAR(%d)", comma ? "," : "",
5009 task_pid_nr(barr->task));
5010 } else {
5011 if (!comma)
5012 pr_cont_work_flush(comma, (work_func_t)-1, pcwsp);
5013 pr_cont_work_flush(comma, work->func, pcwsp);
5014 }
5015}
5016
5017static void show_pwq(struct pool_workqueue *pwq)
5018{
5019 struct pr_cont_work_struct pcws = { .ctr = 0, };
5020 struct worker_pool *pool = pwq->pool;
5021 struct work_struct *work;
5022 struct worker *worker;
5023 bool has_in_flight = false, has_pending = false;
5024 int bkt;
5025
5026 pr_info(" pwq %d:", pool->id);
5027 pr_cont_pool_info(pool);
5028
5029 pr_cont(" active=%d/%d refcnt=%d%s\n",
5030 pwq->nr_active, pwq->max_active, pwq->refcnt,
5031 !list_empty(&pwq->mayday_node) ? " MAYDAY" : "");
5032
5033 hash_for_each(pool->busy_hash, bkt, worker, hentry) {
5034 if (worker->current_pwq == pwq) {
5035 has_in_flight = true;
5036 break;
5037 }
5038 }
5039 if (has_in_flight) {
5040 bool comma = false;
5041
5042 pr_info(" in-flight:");
5043 hash_for_each(pool->busy_hash, bkt, worker, hentry) {
5044 if (worker->current_pwq != pwq)
5045 continue;
5046
5047 pr_cont("%s %d%s:%ps", comma ? "," : "",
5048 task_pid_nr(worker->task),
5049 worker->rescue_wq ? "(RESCUER)" : "",
5050 worker->current_func);
5051 list_for_each_entry(work, &worker->scheduled, entry)
5052 pr_cont_work(false, work, &pcws);
5053 pr_cont_work_flush(comma, (work_func_t)-1L, &pcws);
5054 comma = true;
5055 }
5056 pr_cont("\n");
5057 }
5058
5059 list_for_each_entry(work, &pool->worklist, entry) {
5060 if (get_work_pwq(work) == pwq) {
5061 has_pending = true;
5062 break;
5063 }
5064 }
5065 if (has_pending) {
5066 bool comma = false;
5067
5068 pr_info(" pending:");
5069 list_for_each_entry(work, &pool->worklist, entry) {
5070 if (get_work_pwq(work) != pwq)
5071 continue;
5072
5073 pr_cont_work(comma, work, &pcws);
5074 comma = !(*work_data_bits(work) & WORK_STRUCT_LINKED);
5075 }
5076 pr_cont_work_flush(comma, (work_func_t)-1L, &pcws);
5077 pr_cont("\n");
5078 }
5079
5080 if (!list_empty(&pwq->inactive_works)) {
5081 bool comma = false;
5082
5083 pr_info(" inactive:");
5084 list_for_each_entry(work, &pwq->inactive_works, entry) {
5085 pr_cont_work(comma, work, &pcws);
5086 comma = !(*work_data_bits(work) & WORK_STRUCT_LINKED);
5087 }
5088 pr_cont_work_flush(comma, (work_func_t)-1L, &pcws);
5089 pr_cont("\n");
5090 }
5091}
5092
5093/**
5094 * show_one_workqueue - dump state of specified workqueue
5095 * @wq: workqueue whose state will be printed
5096 */
5097void show_one_workqueue(struct workqueue_struct *wq)
5098{
5099 struct pool_workqueue *pwq;
5100 bool idle = true;
5101 unsigned long flags;
5102
5103 for_each_pwq(pwq, wq) {
5104 if (pwq->nr_active || !list_empty(&pwq->inactive_works)) {
5105 idle = false;
5106 break;
5107 }
5108 }
5109 if (idle) /* Nothing to print for idle workqueue */
5110 return;
5111
5112 pr_info("workqueue %s: flags=0x%x\n", wq->name, wq->flags);
5113
5114 for_each_pwq(pwq, wq) {
5115 raw_spin_lock_irqsave(&pwq->pool->lock, flags);
5116 if (pwq->nr_active || !list_empty(&pwq->inactive_works)) {
5117 /*
5118 * Defer printing to avoid deadlocks in console
5119 * drivers that queue work while holding locks
5120 * also taken in their write paths.
5121 */
5122 printk_deferred_enter();
5123 show_pwq(pwq);
5124 printk_deferred_exit();
5125 }
5126 raw_spin_unlock_irqrestore(&pwq->pool->lock, flags);
5127 /*
5128 * We could be printing a lot from atomic context, e.g.
5129 * sysrq-t -> show_all_workqueues(). Avoid triggering
5130 * hard lockup.
5131 */
5132 touch_nmi_watchdog();
5133 }
5134
5135}
5136
5137/**
5138 * show_one_worker_pool - dump state of specified worker pool
5139 * @pool: worker pool whose state will be printed
5140 */
5141static void show_one_worker_pool(struct worker_pool *pool)
5142{
5143 struct worker *worker;
5144 bool first = true;
5145 unsigned long flags;
5146 unsigned long hung = 0;
5147
5148 raw_spin_lock_irqsave(&pool->lock, flags);
5149 if (pool->nr_workers == pool->nr_idle)
5150 goto next_pool;
5151
5152 /* How long the first pending work is waiting for a worker. */
5153 if (!list_empty(&pool->worklist))
5154 hung = jiffies_to_msecs(jiffies - pool->watchdog_ts) / 1000;
5155
5156 /*
5157 * Defer printing to avoid deadlocks in console drivers that
5158 * queue work while holding locks also taken in their write
5159 * paths.
5160 */
5161 printk_deferred_enter();
5162 pr_info("pool %d:", pool->id);
5163 pr_cont_pool_info(pool);
5164 pr_cont(" hung=%lus workers=%d", hung, pool->nr_workers);
5165 if (pool->manager)
5166 pr_cont(" manager: %d",
5167 task_pid_nr(pool->manager->task));
5168 list_for_each_entry(worker, &pool->idle_list, entry) {
5169 pr_cont(" %s%d", first ? "idle: " : "",
5170 task_pid_nr(worker->task));
5171 first = false;
5172 }
5173 pr_cont("\n");
5174 printk_deferred_exit();
5175next_pool:
5176 raw_spin_unlock_irqrestore(&pool->lock, flags);
5177 /*
5178 * We could be printing a lot from atomic context, e.g.
5179 * sysrq-t -> show_all_workqueues(). Avoid triggering
5180 * hard lockup.
5181 */
5182 touch_nmi_watchdog();
5183
5184}
5185
5186/**
5187 * show_all_workqueues - dump workqueue state
5188 *
5189 * Called from a sysrq handler and prints out all busy workqueues and pools.
5190 */
5191void show_all_workqueues(void)
5192{
5193 struct workqueue_struct *wq;
5194 struct worker_pool *pool;
5195 int pi;
5196
5197 rcu_read_lock();
5198
5199 pr_info("Showing busy workqueues and worker pools:\n");
5200
5201 list_for_each_entry_rcu(wq, &workqueues, list)
5202 show_one_workqueue(wq);
5203
5204 for_each_pool(pool, pi)
5205 show_one_worker_pool(pool);
5206
5207 rcu_read_unlock();
5208}
5209
5210/**
5211 * show_freezable_workqueues - dump freezable workqueue state
5212 *
5213 * Called from try_to_freeze_tasks() and prints out all freezable workqueues
5214 * still busy.
5215 */
5216void show_freezable_workqueues(void)
5217{
5218 struct workqueue_struct *wq;
5219
5220 rcu_read_lock();
5221
5222 pr_info("Showing freezable workqueues that are still busy:\n");
5223
5224 list_for_each_entry_rcu(wq, &workqueues, list) {
5225 if (!(wq->flags & WQ_FREEZABLE))
5226 continue;
5227 show_one_workqueue(wq);
5228 }
5229
5230 rcu_read_unlock();
5231}
5232
5233/* used to show worker information through /proc/PID/{comm,stat,status} */
5234void wq_worker_comm(char *buf, size_t size, struct task_struct *task)
5235{
5236 int off;
5237
5238 /* always show the actual comm */
5239 off = strscpy(buf, task->comm, size);
5240 if (off < 0)
5241 return;
5242
5243 /* stabilize PF_WQ_WORKER and worker pool association */
5244 mutex_lock(&wq_pool_attach_mutex);
5245
5246 if (task->flags & PF_WQ_WORKER) {
5247 struct worker *worker = kthread_data(task);
5248 struct worker_pool *pool = worker->pool;
5249
5250 if (pool) {
5251 raw_spin_lock_irq(&pool->lock);
5252 /*
5253 * ->desc tracks information (wq name or
5254 * set_worker_desc()) for the latest execution. If
5255 * current, prepend '+', otherwise '-'.
5256 */
5257 if (worker->desc[0] != '\0') {
5258 if (worker->current_work)
5259 scnprintf(buf + off, size - off, "+%s",
5260 worker->desc);
5261 else
5262 scnprintf(buf + off, size - off, "-%s",
5263 worker->desc);
5264 }
5265 raw_spin_unlock_irq(&pool->lock);
5266 }
5267 }
5268
5269 mutex_unlock(&wq_pool_attach_mutex);
5270}
5271
5272#ifdef CONFIG_SMP
5273
5274/*
5275 * CPU hotplug.
5276 *
5277 * There are two challenges in supporting CPU hotplug. Firstly, there
5278 * are a lot of assumptions on strong associations among work, pwq and
5279 * pool which make migrating pending and scheduled works very
5280 * difficult to implement without impacting hot paths. Secondly,
5281 * worker pools serve mix of short, long and very long running works making
5282 * blocked draining impractical.
5283 *
5284 * This is solved by allowing the pools to be disassociated from the CPU
5285 * running as an unbound one and allowing it to be reattached later if the
5286 * cpu comes back online.
5287 */
5288
5289static void unbind_workers(int cpu)
5290{
5291 struct worker_pool *pool;
5292 struct worker *worker;
5293
5294 for_each_cpu_worker_pool(pool, cpu) {
5295 mutex_lock(&wq_pool_attach_mutex);
5296 raw_spin_lock_irq(&pool->lock);
5297
5298 /*
5299 * We've blocked all attach/detach operations. Make all workers
5300 * unbound and set DISASSOCIATED. Before this, all workers
5301 * must be on the cpu. After this, they may become diasporas.
5302 * And the preemption disabled section in their sched callbacks
5303 * are guaranteed to see WORKER_UNBOUND since the code here
5304 * is on the same cpu.
5305 */
5306 for_each_pool_worker(worker, pool)
5307 worker->flags |= WORKER_UNBOUND;
5308
5309 pool->flags |= POOL_DISASSOCIATED;
5310
5311 /*
5312 * The handling of nr_running in sched callbacks are disabled
5313 * now. Zap nr_running. After this, nr_running stays zero and
5314 * need_more_worker() and keep_working() are always true as
5315 * long as the worklist is not empty. This pool now behaves as
5316 * an unbound (in terms of concurrency management) pool which
5317 * are served by workers tied to the pool.
5318 */
5319 pool->nr_running = 0;
5320
5321 /*
5322 * With concurrency management just turned off, a busy
5323 * worker blocking could lead to lengthy stalls. Kick off
5324 * unbound chain execution of currently pending work items.
5325 */
5326 wake_up_worker(pool);
5327
5328 raw_spin_unlock_irq(&pool->lock);
5329
5330 for_each_pool_worker(worker, pool)
5331 unbind_worker(worker);
5332
5333 mutex_unlock(&wq_pool_attach_mutex);
5334 }
5335}
5336
5337/**
5338 * rebind_workers - rebind all workers of a pool to the associated CPU
5339 * @pool: pool of interest
5340 *
5341 * @pool->cpu is coming online. Rebind all workers to the CPU.
5342 */
5343static void rebind_workers(struct worker_pool *pool)
5344{
5345 struct worker *worker;
5346
5347 lockdep_assert_held(&wq_pool_attach_mutex);
5348
5349 /*
5350 * Restore CPU affinity of all workers. As all idle workers should
5351 * be on the run-queue of the associated CPU before any local
5352 * wake-ups for concurrency management happen, restore CPU affinity
5353 * of all workers first and then clear UNBOUND. As we're called
5354 * from CPU_ONLINE, the following shouldn't fail.
5355 */
5356 for_each_pool_worker(worker, pool) {
5357 kthread_set_per_cpu(worker->task, pool->cpu);
5358 WARN_ON_ONCE(set_cpus_allowed_ptr(worker->task,
5359 pool->attrs->cpumask) < 0);
5360 }
5361
5362 raw_spin_lock_irq(&pool->lock);
5363
5364 pool->flags &= ~POOL_DISASSOCIATED;
5365
5366 for_each_pool_worker(worker, pool) {
5367 unsigned int worker_flags = worker->flags;
5368
5369 /*
5370 * We want to clear UNBOUND but can't directly call
5371 * worker_clr_flags() or adjust nr_running. Atomically
5372 * replace UNBOUND with another NOT_RUNNING flag REBOUND.
5373 * @worker will clear REBOUND using worker_clr_flags() when
5374 * it initiates the next execution cycle thus restoring
5375 * concurrency management. Note that when or whether
5376 * @worker clears REBOUND doesn't affect correctness.
5377 *
5378 * WRITE_ONCE() is necessary because @worker->flags may be
5379 * tested without holding any lock in
5380 * wq_worker_running(). Without it, NOT_RUNNING test may
5381 * fail incorrectly leading to premature concurrency
5382 * management operations.
5383 */
5384 WARN_ON_ONCE(!(worker_flags & WORKER_UNBOUND));
5385 worker_flags |= WORKER_REBOUND;
5386 worker_flags &= ~WORKER_UNBOUND;
5387 WRITE_ONCE(worker->flags, worker_flags);
5388 }
5389
5390 raw_spin_unlock_irq(&pool->lock);
5391}
5392
5393/**
5394 * restore_unbound_workers_cpumask - restore cpumask of unbound workers
5395 * @pool: unbound pool of interest
5396 * @cpu: the CPU which is coming up
5397 *
5398 * An unbound pool may end up with a cpumask which doesn't have any online
5399 * CPUs. When a worker of such pool get scheduled, the scheduler resets
5400 * its cpus_allowed. If @cpu is in @pool's cpumask which didn't have any
5401 * online CPU before, cpus_allowed of all its workers should be restored.
5402 */
5403static void restore_unbound_workers_cpumask(struct worker_pool *pool, int cpu)
5404{
5405 static cpumask_t cpumask;
5406 struct worker *worker;
5407
5408 lockdep_assert_held(&wq_pool_attach_mutex);
5409
5410 /* is @cpu allowed for @pool? */
5411 if (!cpumask_test_cpu(cpu, pool->attrs->cpumask))
5412 return;
5413
5414 cpumask_and(&cpumask, pool->attrs->cpumask, cpu_online_mask);
5415
5416 /* as we're called from CPU_ONLINE, the following shouldn't fail */
5417 for_each_pool_worker(worker, pool)
5418 WARN_ON_ONCE(set_cpus_allowed_ptr(worker->task, &cpumask) < 0);
5419}
5420
5421int workqueue_prepare_cpu(unsigned int cpu)
5422{
5423 struct worker_pool *pool;
5424
5425 for_each_cpu_worker_pool(pool, cpu) {
5426 if (pool->nr_workers)
5427 continue;
5428 if (!create_worker(pool))
5429 return -ENOMEM;
5430 }
5431 return 0;
5432}
5433
5434int workqueue_online_cpu(unsigned int cpu)
5435{
5436 struct worker_pool *pool;
5437 struct workqueue_struct *wq;
5438 int pi;
5439
5440 mutex_lock(&wq_pool_mutex);
5441
5442 for_each_pool(pool, pi) {
5443 mutex_lock(&wq_pool_attach_mutex);
5444
5445 if (pool->cpu == cpu)
5446 rebind_workers(pool);
5447 else if (pool->cpu < 0)
5448 restore_unbound_workers_cpumask(pool, cpu);
5449
5450 mutex_unlock(&wq_pool_attach_mutex);
5451 }
5452
5453 /* update NUMA affinity of unbound workqueues */
5454 list_for_each_entry(wq, &workqueues, list) {
5455 int tcpu;
5456
5457 for_each_possible_cpu(tcpu) {
5458 if (cpu_to_node(tcpu) == cpu_to_node(cpu)) {
5459 wq_update_unbound_numa(wq, tcpu, cpu, true);
5460 }
5461 }
5462 }
5463
5464 mutex_unlock(&wq_pool_mutex);
5465 return 0;
5466}
5467
5468int workqueue_offline_cpu(unsigned int cpu)
5469{
5470 struct workqueue_struct *wq;
5471
5472 /* unbinding per-cpu workers should happen on the local CPU */
5473 if (WARN_ON(cpu != smp_processor_id()))
5474 return -1;
5475
5476 unbind_workers(cpu);
5477
5478 /* update NUMA affinity of unbound workqueues */
5479 mutex_lock(&wq_pool_mutex);
5480 list_for_each_entry(wq, &workqueues, list) {
5481 int tcpu;
5482
5483 for_each_possible_cpu(tcpu) {
5484 if (cpu_to_node(tcpu) == cpu_to_node(cpu)) {
5485 wq_update_unbound_numa(wq, tcpu, cpu, false);
5486 }
5487 }
5488 }
5489 mutex_unlock(&wq_pool_mutex);
5490
5491 return 0;
5492}
5493
5494struct work_for_cpu {
5495 struct work_struct work;
5496 long (*fn)(void *);
5497 void *arg;
5498 long ret;
5499};
5500
5501static void work_for_cpu_fn(struct work_struct *work)
5502{
5503 struct work_for_cpu *wfc = container_of(work, struct work_for_cpu, work);
5504
5505 wfc->ret = wfc->fn(wfc->arg);
5506}
5507
5508/**
5509 * work_on_cpu - run a function in thread context on a particular cpu
5510 * @cpu: the cpu to run on
5511 * @fn: the function to run
5512 * @arg: the function arg
5513 *
5514 * It is up to the caller to ensure that the cpu doesn't go offline.
5515 * The caller must not hold any locks which would prevent @fn from completing.
5516 *
5517 * Return: The value @fn returns.
5518 */
5519long work_on_cpu(int cpu, long (*fn)(void *), void *arg)
5520{
5521 struct work_for_cpu wfc = { .fn = fn, .arg = arg };
5522
5523 INIT_WORK_ONSTACK(&wfc.work, work_for_cpu_fn);
5524 schedule_work_on(cpu, &wfc.work);
5525 flush_work(&wfc.work);
5526 destroy_work_on_stack(&wfc.work);
5527 return wfc.ret;
5528}
5529EXPORT_SYMBOL_GPL(work_on_cpu);
5530
5531/**
5532 * work_on_cpu_safe - run a function in thread context on a particular cpu
5533 * @cpu: the cpu to run on
5534 * @fn: the function to run
5535 * @arg: the function argument
5536 *
5537 * Disables CPU hotplug and calls work_on_cpu(). The caller must not hold
5538 * any locks which would prevent @fn from completing.
5539 *
5540 * Return: The value @fn returns.
5541 */
5542long work_on_cpu_safe(int cpu, long (*fn)(void *), void *arg)
5543{
5544 long ret = -ENODEV;
5545
5546 cpus_read_lock();
5547 if (cpu_online(cpu))
5548 ret = work_on_cpu(cpu, fn, arg);
5549 cpus_read_unlock();
5550 return ret;
5551}
5552EXPORT_SYMBOL_GPL(work_on_cpu_safe);
5553#endif /* CONFIG_SMP */
5554
5555#ifdef CONFIG_FREEZER
5556
5557/**
5558 * freeze_workqueues_begin - begin freezing workqueues
5559 *
5560 * Start freezing workqueues. After this function returns, all freezable
5561 * workqueues will queue new works to their inactive_works list instead of
5562 * pool->worklist.
5563 *
5564 * CONTEXT:
5565 * Grabs and releases wq_pool_mutex, wq->mutex and pool->lock's.
5566 */
5567void freeze_workqueues_begin(void)
5568{
5569 struct workqueue_struct *wq;
5570 struct pool_workqueue *pwq;
5571
5572 mutex_lock(&wq_pool_mutex);
5573
5574 WARN_ON_ONCE(workqueue_freezing);
5575 workqueue_freezing = true;
5576
5577 list_for_each_entry(wq, &workqueues, list) {
5578 mutex_lock(&wq->mutex);
5579 for_each_pwq(pwq, wq)
5580 pwq_adjust_max_active(pwq);
5581 mutex_unlock(&wq->mutex);
5582 }
5583
5584 mutex_unlock(&wq_pool_mutex);
5585}
5586
5587/**
5588 * freeze_workqueues_busy - are freezable workqueues still busy?
5589 *
5590 * Check whether freezing is complete. This function must be called
5591 * between freeze_workqueues_begin() and thaw_workqueues().
5592 *
5593 * CONTEXT:
5594 * Grabs and releases wq_pool_mutex.
5595 *
5596 * Return:
5597 * %true if some freezable workqueues are still busy. %false if freezing
5598 * is complete.
5599 */
5600bool freeze_workqueues_busy(void)
5601{
5602 bool busy = false;
5603 struct workqueue_struct *wq;
5604 struct pool_workqueue *pwq;
5605
5606 mutex_lock(&wq_pool_mutex);
5607
5608 WARN_ON_ONCE(!workqueue_freezing);
5609
5610 list_for_each_entry(wq, &workqueues, list) {
5611 if (!(wq->flags & WQ_FREEZABLE))
5612 continue;
5613 /*
5614 * nr_active is monotonically decreasing. It's safe
5615 * to peek without lock.
5616 */
5617 rcu_read_lock();
5618 for_each_pwq(pwq, wq) {
5619 WARN_ON_ONCE(pwq->nr_active < 0);
5620 if (pwq->nr_active) {
5621 busy = true;
5622 rcu_read_unlock();
5623 goto out_unlock;
5624 }
5625 }
5626 rcu_read_unlock();
5627 }
5628out_unlock:
5629 mutex_unlock(&wq_pool_mutex);
5630 return busy;
5631}
5632
5633/**
5634 * thaw_workqueues - thaw workqueues
5635 *
5636 * Thaw workqueues. Normal queueing is restored and all collected
5637 * frozen works are transferred to their respective pool worklists.
5638 *
5639 * CONTEXT:
5640 * Grabs and releases wq_pool_mutex, wq->mutex and pool->lock's.
5641 */
5642void thaw_workqueues(void)
5643{
5644 struct workqueue_struct *wq;
5645 struct pool_workqueue *pwq;
5646
5647 mutex_lock(&wq_pool_mutex);
5648
5649 if (!workqueue_freezing)
5650 goto out_unlock;
5651
5652 workqueue_freezing = false;
5653
5654 /* restore max_active and repopulate worklist */
5655 list_for_each_entry(wq, &workqueues, list) {
5656 mutex_lock(&wq->mutex);
5657 for_each_pwq(pwq, wq)
5658 pwq_adjust_max_active(pwq);
5659 mutex_unlock(&wq->mutex);
5660 }
5661
5662out_unlock:
5663 mutex_unlock(&wq_pool_mutex);
5664}
5665#endif /* CONFIG_FREEZER */
5666
5667static int workqueue_apply_unbound_cpumask(const cpumask_var_t unbound_cpumask)
5668{
5669 LIST_HEAD(ctxs);
5670 int ret = 0;
5671 struct workqueue_struct *wq;
5672 struct apply_wqattrs_ctx *ctx, *n;
5673
5674 lockdep_assert_held(&wq_pool_mutex);
5675
5676 list_for_each_entry(wq, &workqueues, list) {
5677 if (!(wq->flags & WQ_UNBOUND))
5678 continue;
5679 /* creating multiple pwqs breaks ordering guarantee */
5680 if (wq->flags & __WQ_ORDERED)
5681 continue;
5682
5683 ctx = apply_wqattrs_prepare(wq, wq->unbound_attrs, unbound_cpumask);
5684 if (!ctx) {
5685 ret = -ENOMEM;
5686 break;
5687 }
5688
5689 list_add_tail(&ctx->list, &ctxs);
5690 }
5691
5692 list_for_each_entry_safe(ctx, n, &ctxs, list) {
5693 if (!ret)
5694 apply_wqattrs_commit(ctx);
5695 apply_wqattrs_cleanup(ctx);
5696 }
5697
5698 if (!ret) {
5699 mutex_lock(&wq_pool_attach_mutex);
5700 cpumask_copy(wq_unbound_cpumask, unbound_cpumask);
5701 mutex_unlock(&wq_pool_attach_mutex);
5702 }
5703 return ret;
5704}
5705
5706/**
5707 * workqueue_set_unbound_cpumask - Set the low-level unbound cpumask
5708 * @cpumask: the cpumask to set
5709 *
5710 * The low-level workqueues cpumask is a global cpumask that limits
5711 * the affinity of all unbound workqueues. This function check the @cpumask
5712 * and apply it to all unbound workqueues and updates all pwqs of them.
5713 *
5714 * Return: 0 - Success
5715 * -EINVAL - Invalid @cpumask
5716 * -ENOMEM - Failed to allocate memory for attrs or pwqs.
5717 */
5718int workqueue_set_unbound_cpumask(cpumask_var_t cpumask)
5719{
5720 int ret = -EINVAL;
5721
5722 /*
5723 * Not excluding isolated cpus on purpose.
5724 * If the user wishes to include them, we allow that.
5725 */
5726 cpumask_and(cpumask, cpumask, cpu_possible_mask);
5727 if (!cpumask_empty(cpumask)) {
5728 apply_wqattrs_lock();
5729 if (cpumask_equal(cpumask, wq_unbound_cpumask)) {
5730 ret = 0;
5731 goto out_unlock;
5732 }
5733
5734 ret = workqueue_apply_unbound_cpumask(cpumask);
5735
5736out_unlock:
5737 apply_wqattrs_unlock();
5738 }
5739
5740 return ret;
5741}
5742
5743#ifdef CONFIG_SYSFS
5744/*
5745 * Workqueues with WQ_SYSFS flag set is visible to userland via
5746 * /sys/bus/workqueue/devices/WQ_NAME. All visible workqueues have the
5747 * following attributes.
5748 *
5749 * per_cpu RO bool : whether the workqueue is per-cpu or unbound
5750 * max_active RW int : maximum number of in-flight work items
5751 *
5752 * Unbound workqueues have the following extra attributes.
5753 *
5754 * nice RW int : nice value of the workers
5755 * cpumask RW mask : bitmask of allowed CPUs for the workers
5756 */
5757struct wq_device {
5758 struct workqueue_struct *wq;
5759 struct device dev;
5760};
5761
5762static struct workqueue_struct *dev_to_wq(struct device *dev)
5763{
5764 struct wq_device *wq_dev = container_of(dev, struct wq_device, dev);
5765
5766 return wq_dev->wq;
5767}
5768
5769static ssize_t per_cpu_show(struct device *dev, struct device_attribute *attr,
5770 char *buf)
5771{
5772 struct workqueue_struct *wq = dev_to_wq(dev);
5773
5774 return scnprintf(buf, PAGE_SIZE, "%d\n", (bool)!(wq->flags & WQ_UNBOUND));
5775}
5776static DEVICE_ATTR_RO(per_cpu);
5777
5778static ssize_t max_active_show(struct device *dev,
5779 struct device_attribute *attr, char *buf)
5780{
5781 struct workqueue_struct *wq = dev_to_wq(dev);
5782
5783 return scnprintf(buf, PAGE_SIZE, "%d\n", wq->saved_max_active);
5784}
5785
5786static ssize_t max_active_store(struct device *dev,
5787 struct device_attribute *attr, const char *buf,
5788 size_t count)
5789{
5790 struct workqueue_struct *wq = dev_to_wq(dev);
5791 int val;
5792
5793 if (sscanf(buf, "%d", &val) != 1 || val <= 0)
5794 return -EINVAL;
5795
5796 workqueue_set_max_active(wq, val);
5797 return count;
5798}
5799static DEVICE_ATTR_RW(max_active);
5800
5801static struct attribute *wq_sysfs_attrs[] = {
5802 &dev_attr_per_cpu.attr,
5803 &dev_attr_max_active.attr,
5804 NULL,
5805};
5806ATTRIBUTE_GROUPS(wq_sysfs);
5807
5808static ssize_t wq_nice_show(struct device *dev, struct device_attribute *attr,
5809 char *buf)
5810{
5811 struct workqueue_struct *wq = dev_to_wq(dev);
5812 int written;
5813
5814 mutex_lock(&wq->mutex);
5815 written = scnprintf(buf, PAGE_SIZE, "%d\n", wq->unbound_attrs->nice);
5816 mutex_unlock(&wq->mutex);
5817
5818 return written;
5819}
5820
5821/* prepare workqueue_attrs for sysfs store operations */
5822static struct workqueue_attrs *wq_sysfs_prep_attrs(struct workqueue_struct *wq)
5823{
5824 struct workqueue_attrs *attrs;
5825
5826 lockdep_assert_held(&wq_pool_mutex);
5827
5828 attrs = alloc_workqueue_attrs();
5829 if (!attrs)
5830 return NULL;
5831
5832 copy_workqueue_attrs(attrs, wq->unbound_attrs);
5833 return attrs;
5834}
5835
5836static ssize_t wq_nice_store(struct device *dev, struct device_attribute *attr,
5837 const char *buf, size_t count)
5838{
5839 struct workqueue_struct *wq = dev_to_wq(dev);
5840 struct workqueue_attrs *attrs;
5841 int ret = -ENOMEM;
5842
5843 apply_wqattrs_lock();
5844
5845 attrs = wq_sysfs_prep_attrs(wq);
5846 if (!attrs)
5847 goto out_unlock;
5848
5849 if (sscanf(buf, "%d", &attrs->nice) == 1 &&
5850 attrs->nice >= MIN_NICE && attrs->nice <= MAX_NICE)
5851 ret = apply_workqueue_attrs_locked(wq, attrs);
5852 else
5853 ret = -EINVAL;
5854
5855out_unlock:
5856 apply_wqattrs_unlock();
5857 free_workqueue_attrs(attrs);
5858 return ret ?: count;
5859}
5860
5861static ssize_t wq_cpumask_show(struct device *dev,
5862 struct device_attribute *attr, char *buf)
5863{
5864 struct workqueue_struct *wq = dev_to_wq(dev);
5865 int written;
5866
5867 mutex_lock(&wq->mutex);
5868 written = scnprintf(buf, PAGE_SIZE, "%*pb\n",
5869 cpumask_pr_args(wq->unbound_attrs->cpumask));
5870 mutex_unlock(&wq->mutex);
5871 return written;
5872}
5873
5874static ssize_t wq_cpumask_store(struct device *dev,
5875 struct device_attribute *attr,
5876 const char *buf, size_t count)
5877{
5878 struct workqueue_struct *wq = dev_to_wq(dev);
5879 struct workqueue_attrs *attrs;
5880 int ret = -ENOMEM;
5881
5882 apply_wqattrs_lock();
5883
5884 attrs = wq_sysfs_prep_attrs(wq);
5885 if (!attrs)
5886 goto out_unlock;
5887
5888 ret = cpumask_parse(buf, attrs->cpumask);
5889 if (!ret)
5890 ret = apply_workqueue_attrs_locked(wq, attrs);
5891
5892out_unlock:
5893 apply_wqattrs_unlock();
5894 free_workqueue_attrs(attrs);
5895 return ret ?: count;
5896}
5897
5898static struct device_attribute wq_sysfs_unbound_attrs[] = {
5899 __ATTR(nice, 0644, wq_nice_show, wq_nice_store),
5900 __ATTR(cpumask, 0644, wq_cpumask_show, wq_cpumask_store),
5901 __ATTR_NULL,
5902};
5903
5904static struct bus_type wq_subsys = {
5905 .name = "workqueue",
5906 .dev_groups = wq_sysfs_groups,
5907};
5908
5909static ssize_t wq_unbound_cpumask_show(struct device *dev,
5910 struct device_attribute *attr, char *buf)
5911{
5912 int written;
5913
5914 mutex_lock(&wq_pool_mutex);
5915 written = scnprintf(buf, PAGE_SIZE, "%*pb\n",
5916 cpumask_pr_args(wq_unbound_cpumask));
5917 mutex_unlock(&wq_pool_mutex);
5918
5919 return written;
5920}
5921
5922static ssize_t wq_unbound_cpumask_store(struct device *dev,
5923 struct device_attribute *attr, const char *buf, size_t count)
5924{
5925 cpumask_var_t cpumask;
5926 int ret;
5927
5928 if (!zalloc_cpumask_var(&cpumask, GFP_KERNEL))
5929 return -ENOMEM;
5930
5931 ret = cpumask_parse(buf, cpumask);
5932 if (!ret)
5933 ret = workqueue_set_unbound_cpumask(cpumask);
5934
5935 free_cpumask_var(cpumask);
5936 return ret ? ret : count;
5937}
5938
5939static struct device_attribute wq_sysfs_cpumask_attr =
5940 __ATTR(cpumask, 0644, wq_unbound_cpumask_show,
5941 wq_unbound_cpumask_store);
5942
5943static int __init wq_sysfs_init(void)
5944{
5945 struct device *dev_root;
5946 int err;
5947
5948 err = subsys_virtual_register(&wq_subsys, NULL);
5949 if (err)
5950 return err;
5951
5952 dev_root = bus_get_dev_root(&wq_subsys);
5953 if (dev_root) {
5954 err = device_create_file(dev_root, &wq_sysfs_cpumask_attr);
5955 put_device(dev_root);
5956 }
5957 return err;
5958}
5959core_initcall(wq_sysfs_init);
5960
5961static void wq_device_release(struct device *dev)
5962{
5963 struct wq_device *wq_dev = container_of(dev, struct wq_device, dev);
5964
5965 kfree(wq_dev);
5966}
5967
5968/**
5969 * workqueue_sysfs_register - make a workqueue visible in sysfs
5970 * @wq: the workqueue to register
5971 *
5972 * Expose @wq in sysfs under /sys/bus/workqueue/devices.
5973 * alloc_workqueue*() automatically calls this function if WQ_SYSFS is set
5974 * which is the preferred method.
5975 *
5976 * Workqueue user should use this function directly iff it wants to apply
5977 * workqueue_attrs before making the workqueue visible in sysfs; otherwise,
5978 * apply_workqueue_attrs() may race against userland updating the
5979 * attributes.
5980 *
5981 * Return: 0 on success, -errno on failure.
5982 */
5983int workqueue_sysfs_register(struct workqueue_struct *wq)
5984{
5985 struct wq_device *wq_dev;
5986 int ret;
5987
5988 /*
5989 * Adjusting max_active or creating new pwqs by applying
5990 * attributes breaks ordering guarantee. Disallow exposing ordered
5991 * workqueues.
5992 */
5993 if (WARN_ON(wq->flags & __WQ_ORDERED_EXPLICIT))
5994 return -EINVAL;
5995
5996 wq->wq_dev = wq_dev = kzalloc(sizeof(*wq_dev), GFP_KERNEL);
5997 if (!wq_dev)
5998 return -ENOMEM;
5999
6000 wq_dev->wq = wq;
6001 wq_dev->dev.bus = &wq_subsys;
6002 wq_dev->dev.release = wq_device_release;
6003 dev_set_name(&wq_dev->dev, "%s", wq->name);
6004
6005 /*
6006 * unbound_attrs are created separately. Suppress uevent until
6007 * everything is ready.
6008 */
6009 dev_set_uevent_suppress(&wq_dev->dev, true);
6010
6011 ret = device_register(&wq_dev->dev);
6012 if (ret) {
6013 put_device(&wq_dev->dev);
6014 wq->wq_dev = NULL;
6015 return ret;
6016 }
6017
6018 if (wq->flags & WQ_UNBOUND) {
6019 struct device_attribute *attr;
6020
6021 for (attr = wq_sysfs_unbound_attrs; attr->attr.name; attr++) {
6022 ret = device_create_file(&wq_dev->dev, attr);
6023 if (ret) {
6024 device_unregister(&wq_dev->dev);
6025 wq->wq_dev = NULL;
6026 return ret;
6027 }
6028 }
6029 }
6030
6031 dev_set_uevent_suppress(&wq_dev->dev, false);
6032 kobject_uevent(&wq_dev->dev.kobj, KOBJ_ADD);
6033 return 0;
6034}
6035
6036/**
6037 * workqueue_sysfs_unregister - undo workqueue_sysfs_register()
6038 * @wq: the workqueue to unregister
6039 *
6040 * If @wq is registered to sysfs by workqueue_sysfs_register(), unregister.
6041 */
6042static void workqueue_sysfs_unregister(struct workqueue_struct *wq)
6043{
6044 struct wq_device *wq_dev = wq->wq_dev;
6045
6046 if (!wq->wq_dev)
6047 return;
6048
6049 wq->wq_dev = NULL;
6050 device_unregister(&wq_dev->dev);
6051}
6052#else /* CONFIG_SYSFS */
6053static void workqueue_sysfs_unregister(struct workqueue_struct *wq) { }
6054#endif /* CONFIG_SYSFS */
6055
6056/*
6057 * Workqueue watchdog.
6058 *
6059 * Stall may be caused by various bugs - missing WQ_MEM_RECLAIM, illegal
6060 * flush dependency, a concurrency managed work item which stays RUNNING
6061 * indefinitely. Workqueue stalls can be very difficult to debug as the
6062 * usual warning mechanisms don't trigger and internal workqueue state is
6063 * largely opaque.
6064 *
6065 * Workqueue watchdog monitors all worker pools periodically and dumps
6066 * state if some pools failed to make forward progress for a while where
6067 * forward progress is defined as the first item on ->worklist changing.
6068 *
6069 * This mechanism is controlled through the kernel parameter
6070 * "workqueue.watchdog_thresh" which can be updated at runtime through the
6071 * corresponding sysfs parameter file.
6072 */
6073#ifdef CONFIG_WQ_WATCHDOG
6074
6075static unsigned long wq_watchdog_thresh = 30;
6076static struct timer_list wq_watchdog_timer;
6077
6078static unsigned long wq_watchdog_touched = INITIAL_JIFFIES;
6079static DEFINE_PER_CPU(unsigned long, wq_watchdog_touched_cpu) = INITIAL_JIFFIES;
6080
6081/*
6082 * Show workers that might prevent the processing of pending work items.
6083 * The only candidates are CPU-bound workers in the running state.
6084 * Pending work items should be handled by another idle worker
6085 * in all other situations.
6086 */
6087static void show_cpu_pool_hog(struct worker_pool *pool)
6088{
6089 struct worker *worker;
6090 unsigned long flags;
6091 int bkt;
6092
6093 raw_spin_lock_irqsave(&pool->lock, flags);
6094
6095 hash_for_each(pool->busy_hash, bkt, worker, hentry) {
6096 if (task_is_running(worker->task)) {
6097 /*
6098 * Defer printing to avoid deadlocks in console
6099 * drivers that queue work while holding locks
6100 * also taken in their write paths.
6101 */
6102 printk_deferred_enter();
6103
6104 pr_info("pool %d:\n", pool->id);
6105 sched_show_task(worker->task);
6106
6107 printk_deferred_exit();
6108 }
6109 }
6110
6111 raw_spin_unlock_irqrestore(&pool->lock, flags);
6112}
6113
6114static void show_cpu_pools_hogs(void)
6115{
6116 struct worker_pool *pool;
6117 int pi;
6118
6119 pr_info("Showing backtraces of running workers in stalled CPU-bound worker pools:\n");
6120
6121 rcu_read_lock();
6122
6123 for_each_pool(pool, pi) {
6124 if (pool->cpu_stall)
6125 show_cpu_pool_hog(pool);
6126
6127 }
6128
6129 rcu_read_unlock();
6130}
6131
6132static void wq_watchdog_reset_touched(void)
6133{
6134 int cpu;
6135
6136 wq_watchdog_touched = jiffies;
6137 for_each_possible_cpu(cpu)
6138 per_cpu(wq_watchdog_touched_cpu, cpu) = jiffies;
6139}
6140
6141static void wq_watchdog_timer_fn(struct timer_list *unused)
6142{
6143 unsigned long thresh = READ_ONCE(wq_watchdog_thresh) * HZ;
6144 bool lockup_detected = false;
6145 bool cpu_pool_stall = false;
6146 unsigned long now = jiffies;
6147 struct worker_pool *pool;
6148 int pi;
6149
6150 if (!thresh)
6151 return;
6152
6153 rcu_read_lock();
6154
6155 for_each_pool(pool, pi) {
6156 unsigned long pool_ts, touched, ts;
6157
6158 pool->cpu_stall = false;
6159 if (list_empty(&pool->worklist))
6160 continue;
6161
6162 /*
6163 * If a virtual machine is stopped by the host it can look to
6164 * the watchdog like a stall.
6165 */
6166 kvm_check_and_clear_guest_paused();
6167
6168 /* get the latest of pool and touched timestamps */
6169 if (pool->cpu >= 0)
6170 touched = READ_ONCE(per_cpu(wq_watchdog_touched_cpu, pool->cpu));
6171 else
6172 touched = READ_ONCE(wq_watchdog_touched);
6173 pool_ts = READ_ONCE(pool->watchdog_ts);
6174
6175 if (time_after(pool_ts, touched))
6176 ts = pool_ts;
6177 else
6178 ts = touched;
6179
6180 /* did we stall? */
6181 if (time_after(now, ts + thresh)) {
6182 lockup_detected = true;
6183 if (pool->cpu >= 0) {
6184 pool->cpu_stall = true;
6185 cpu_pool_stall = true;
6186 }
6187 pr_emerg("BUG: workqueue lockup - pool");
6188 pr_cont_pool_info(pool);
6189 pr_cont(" stuck for %us!\n",
6190 jiffies_to_msecs(now - pool_ts) / 1000);
6191 }
6192
6193
6194 }
6195
6196 rcu_read_unlock();
6197
6198 if (lockup_detected)
6199 show_all_workqueues();
6200
6201 if (cpu_pool_stall)
6202 show_cpu_pools_hogs();
6203
6204 wq_watchdog_reset_touched();
6205 mod_timer(&wq_watchdog_timer, jiffies + thresh);
6206}
6207
6208notrace void wq_watchdog_touch(int cpu)
6209{
6210 if (cpu >= 0)
6211 per_cpu(wq_watchdog_touched_cpu, cpu) = jiffies;
6212
6213 wq_watchdog_touched = jiffies;
6214}
6215
6216static void wq_watchdog_set_thresh(unsigned long thresh)
6217{
6218 wq_watchdog_thresh = 0;
6219 del_timer_sync(&wq_watchdog_timer);
6220
6221 if (thresh) {
6222 wq_watchdog_thresh = thresh;
6223 wq_watchdog_reset_touched();
6224 mod_timer(&wq_watchdog_timer, jiffies + thresh * HZ);
6225 }
6226}
6227
6228static int wq_watchdog_param_set_thresh(const char *val,
6229 const struct kernel_param *kp)
6230{
6231 unsigned long thresh;
6232 int ret;
6233
6234 ret = kstrtoul(val, 0, &thresh);
6235 if (ret)
6236 return ret;
6237
6238 if (system_wq)
6239 wq_watchdog_set_thresh(thresh);
6240 else
6241 wq_watchdog_thresh = thresh;
6242
6243 return 0;
6244}
6245
6246static const struct kernel_param_ops wq_watchdog_thresh_ops = {
6247 .set = wq_watchdog_param_set_thresh,
6248 .get = param_get_ulong,
6249};
6250
6251module_param_cb(watchdog_thresh, &wq_watchdog_thresh_ops, &wq_watchdog_thresh,
6252 0644);
6253
6254static void wq_watchdog_init(void)
6255{
6256 timer_setup(&wq_watchdog_timer, wq_watchdog_timer_fn, TIMER_DEFERRABLE);
6257 wq_watchdog_set_thresh(wq_watchdog_thresh);
6258}
6259
6260#else /* CONFIG_WQ_WATCHDOG */
6261
6262static inline void wq_watchdog_init(void) { }
6263
6264#endif /* CONFIG_WQ_WATCHDOG */
6265
6266static void __init wq_numa_init(void)
6267{
6268 cpumask_var_t *tbl;
6269 int node, cpu;
6270
6271 if (num_possible_nodes() <= 1)
6272 return;
6273
6274 for_each_possible_cpu(cpu) {
6275 if (WARN_ON(cpu_to_node(cpu) == NUMA_NO_NODE)) {
6276 pr_warn("workqueue: NUMA node mapping not available for cpu%d, disabling NUMA support\n", cpu);
6277 return;
6278 }
6279 }
6280
6281 wq_update_unbound_numa_attrs_buf = alloc_workqueue_attrs();
6282 BUG_ON(!wq_update_unbound_numa_attrs_buf);
6283
6284 /*
6285 * We want masks of possible CPUs of each node which isn't readily
6286 * available. Build one from cpu_to_node() which should have been
6287 * fully initialized by now.
6288 */
6289 tbl = kcalloc(nr_node_ids, sizeof(tbl[0]), GFP_KERNEL);
6290 BUG_ON(!tbl);
6291
6292 for_each_node(node)
6293 BUG_ON(!zalloc_cpumask_var_node(&tbl[node], GFP_KERNEL,
6294 node_online(node) ? node : NUMA_NO_NODE));
6295
6296 for_each_possible_cpu(cpu) {
6297 node = cpu_to_node(cpu);
6298 cpumask_set_cpu(cpu, tbl[node]);
6299 }
6300
6301 wq_numa_possible_cpumask = tbl;
6302 wq_numa_enabled = true;
6303}
6304
6305/**
6306 * workqueue_init_early - early init for workqueue subsystem
6307 *
6308 * This is the first half of two-staged workqueue subsystem initialization
6309 * and invoked as soon as the bare basics - memory allocation, cpumasks and
6310 * idr are up. It sets up all the data structures and system workqueues
6311 * and allows early boot code to create workqueues and queue/cancel work
6312 * items. Actual work item execution starts only after kthreads can be
6313 * created and scheduled right before early initcalls.
6314 */
6315void __init workqueue_init_early(void)
6316{
6317 int std_nice[NR_STD_WORKER_POOLS] = { 0, HIGHPRI_NICE_LEVEL };
6318 int i, cpu;
6319
6320 BUILD_BUG_ON(__alignof__(struct pool_workqueue) < __alignof__(long long));
6321
6322 BUG_ON(!alloc_cpumask_var(&wq_unbound_cpumask, GFP_KERNEL));
6323 cpumask_copy(wq_unbound_cpumask, housekeeping_cpumask(HK_TYPE_WQ));
6324 cpumask_and(wq_unbound_cpumask, wq_unbound_cpumask, housekeeping_cpumask(HK_TYPE_DOMAIN));
6325
6326 if (!cpumask_empty(&wq_cmdline_cpumask))
6327 cpumask_and(wq_unbound_cpumask, wq_unbound_cpumask, &wq_cmdline_cpumask);
6328
6329 pwq_cache = KMEM_CACHE(pool_workqueue, SLAB_PANIC);
6330
6331 /* initialize CPU pools */
6332 for_each_possible_cpu(cpu) {
6333 struct worker_pool *pool;
6334
6335 i = 0;
6336 for_each_cpu_worker_pool(pool, cpu) {
6337 BUG_ON(init_worker_pool(pool));
6338 pool->cpu = cpu;
6339 cpumask_copy(pool->attrs->cpumask, cpumask_of(cpu));
6340 pool->attrs->nice = std_nice[i++];
6341 pool->node = cpu_to_node(cpu);
6342
6343 /* alloc pool ID */
6344 mutex_lock(&wq_pool_mutex);
6345 BUG_ON(worker_pool_assign_id(pool));
6346 mutex_unlock(&wq_pool_mutex);
6347 }
6348 }
6349
6350 /* create default unbound and ordered wq attrs */
6351 for (i = 0; i < NR_STD_WORKER_POOLS; i++) {
6352 struct workqueue_attrs *attrs;
6353
6354 BUG_ON(!(attrs = alloc_workqueue_attrs()));
6355 attrs->nice = std_nice[i];
6356 unbound_std_wq_attrs[i] = attrs;
6357
6358 /*
6359 * An ordered wq should have only one pwq as ordering is
6360 * guaranteed by max_active which is enforced by pwqs.
6361 * Turn off NUMA so that dfl_pwq is used for all nodes.
6362 */
6363 BUG_ON(!(attrs = alloc_workqueue_attrs()));
6364 attrs->nice = std_nice[i];
6365 attrs->no_numa = true;
6366 ordered_wq_attrs[i] = attrs;
6367 }
6368
6369 system_wq = alloc_workqueue("events", 0, 0);
6370 system_highpri_wq = alloc_workqueue("events_highpri", WQ_HIGHPRI, 0);
6371 system_long_wq = alloc_workqueue("events_long", 0, 0);
6372 system_unbound_wq = alloc_workqueue("events_unbound", WQ_UNBOUND,
6373 WQ_MAX_ACTIVE);
6374 system_freezable_wq = alloc_workqueue("events_freezable",
6375 WQ_FREEZABLE, 0);
6376 system_power_efficient_wq = alloc_workqueue("events_power_efficient",
6377 WQ_POWER_EFFICIENT, 0);
6378 system_freezable_power_efficient_wq = alloc_workqueue("events_freezable_power_efficient",
6379 WQ_FREEZABLE | WQ_POWER_EFFICIENT,
6380 0);
6381 BUG_ON(!system_wq || !system_highpri_wq || !system_long_wq ||
6382 !system_unbound_wq || !system_freezable_wq ||
6383 !system_power_efficient_wq ||
6384 !system_freezable_power_efficient_wq);
6385}
6386
6387static void __init wq_cpu_intensive_thresh_init(void)
6388{
6389 unsigned long thresh;
6390 unsigned long bogo;
6391
6392 /* if the user set it to a specific value, keep it */
6393 if (wq_cpu_intensive_thresh_us != ULONG_MAX)
6394 return;
6395
6396 pwq_release_worker = kthread_create_worker(0, "pool_workqueue_release");
6397 BUG_ON(IS_ERR(pwq_release_worker));
6398
6399 /*
6400 * The default of 10ms is derived from the fact that most modern (as of
6401 * 2023) processors can do a lot in 10ms and that it's just below what
6402 * most consider human-perceivable. However, the kernel also runs on a
6403 * lot slower CPUs including microcontrollers where the threshold is way
6404 * too low.
6405 *
6406 * Let's scale up the threshold upto 1 second if BogoMips is below 4000.
6407 * This is by no means accurate but it doesn't have to be. The mechanism
6408 * is still useful even when the threshold is fully scaled up. Also, as
6409 * the reports would usually be applicable to everyone, some machines
6410 * operating on longer thresholds won't significantly diminish their
6411 * usefulness.
6412 */
6413 thresh = 10 * USEC_PER_MSEC;
6414
6415 /* see init/calibrate.c for lpj -> BogoMIPS calculation */
6416 bogo = max_t(unsigned long, loops_per_jiffy / 500000 * HZ, 1);
6417 if (bogo < 4000)
6418 thresh = min_t(unsigned long, thresh * 4000 / bogo, USEC_PER_SEC);
6419
6420 pr_debug("wq_cpu_intensive_thresh: lpj=%lu BogoMIPS=%lu thresh_us=%lu\n",
6421 loops_per_jiffy, bogo, thresh);
6422
6423 wq_cpu_intensive_thresh_us = thresh;
6424}
6425
6426/**
6427 * workqueue_init - bring workqueue subsystem fully online
6428 *
6429 * This is the latter half of two-staged workqueue subsystem initialization
6430 * and invoked as soon as kthreads can be created and scheduled.
6431 * Workqueues have been created and work items queued on them, but there
6432 * are no kworkers executing the work items yet. Populate the worker pools
6433 * with the initial workers and enable future kworker creations.
6434 */
6435void __init workqueue_init(void)
6436{
6437 struct workqueue_struct *wq;
6438 struct worker_pool *pool;
6439 int cpu, bkt;
6440
6441 wq_cpu_intensive_thresh_init();
6442
6443 /*
6444 * It'd be simpler to initialize NUMA in workqueue_init_early() but
6445 * CPU to node mapping may not be available that early on some
6446 * archs such as power and arm64. As per-cpu pools created
6447 * previously could be missing node hint and unbound pools NUMA
6448 * affinity, fix them up.
6449 *
6450 * Also, while iterating workqueues, create rescuers if requested.
6451 */
6452 wq_numa_init();
6453
6454 mutex_lock(&wq_pool_mutex);
6455
6456 for_each_possible_cpu(cpu) {
6457 for_each_cpu_worker_pool(pool, cpu) {
6458 pool->node = cpu_to_node(cpu);
6459 }
6460 }
6461
6462 list_for_each_entry(wq, &workqueues, list) {
6463 wq_update_unbound_numa(wq, smp_processor_id(), smp_processor_id(),
6464 true);
6465 WARN(init_rescuer(wq),
6466 "workqueue: failed to create early rescuer for %s",
6467 wq->name);
6468 }
6469
6470 mutex_unlock(&wq_pool_mutex);
6471
6472 /* create the initial workers */
6473 for_each_online_cpu(cpu) {
6474 for_each_cpu_worker_pool(pool, cpu) {
6475 pool->flags &= ~POOL_DISASSOCIATED;
6476 BUG_ON(!create_worker(pool));
6477 }
6478 }
6479
6480 hash_for_each(unbound_pool_hash, bkt, pool, hash_node)
6481 BUG_ON(!create_worker(pool));
6482
6483 wq_online = true;
6484 wq_watchdog_init();
6485}
6486
6487void __warn_flushing_systemwide_wq(void)
6488{
6489 pr_warn("WARNING: Flushing system-wide workqueues will be prohibited in near future.\n");
6490 dump_stack();
6491}
6492EXPORT_SYMBOL(__warn_flushing_systemwide_wq);
6493
6494static int __init workqueue_unbound_cpus_setup(char *str)
6495{
6496 if (cpulist_parse(str, &wq_cmdline_cpumask) < 0) {
6497 cpumask_clear(&wq_cmdline_cpumask);
6498 pr_warn("workqueue.unbound_cpus: incorrect CPU range, using default\n");
6499 }
6500
6501 return 1;
6502}
6503__setup("workqueue.unbound_cpus=", workqueue_unbound_cpus_setup);