/* see manage_workers() for details on the two manager mutexes */
struct mutex manager_arb; /* manager arbitration */
+ struct worker *manager; /* L: purely informational */
struct mutex attach_mutex; /* attach/detach exclusion */
struct list_head workers; /* A: attached workers */
struct completion *detach_completion; /* all workers detached */
*/
struct workqueue_struct {
struct list_head pwqs; /* WR: all pwqs of this wq */
- struct list_head list; /* PL: list of all workqueues */
+ struct list_head list; /* PR: list of all workqueues */
struct mutex mutex; /* protects this wq */
int work_color; /* WQ: current work color */
#endif
char name[WQ_NAME_LEN]; /* I: workqueue name */
+ /*
+ * Destruction of workqueue_struct is sched-RCU protected to allow
+ * walking the workqueues list without grabbing wq_pool_mutex.
+ * This is used to dump all workqueues from sysrq.
+ */
+ struct rcu_head rcu;
+
/* hot fields used during command issue, aligned to cacheline */
unsigned int flags ____cacheline_aligned; /* WQ: WQ_* flags */
struct pool_workqueue __percpu *cpu_pwqs; /* I: per-cpu pwqs */
static DEFINE_MUTEX(wq_pool_mutex); /* protects pools and workqueues list */
static DEFINE_SPINLOCK(wq_mayday_lock); /* protects wq->maydays list */
-static LIST_HEAD(workqueues); /* PL: list of all workqueues */
+static LIST_HEAD(workqueues); /* PR: list of all workqueues */
static bool workqueue_freezing; /* PL: have wqs started freezing? */
/* the per-cpu worker pools */
static int worker_thread(void *__worker);
static void copy_workqueue_attrs(struct workqueue_attrs *to,
const struct workqueue_attrs *from);
+static void workqueue_sysfs_unregister(struct workqueue_struct *wq);
#define CREATE_TRACE_POINTS
#include <trace/events/workqueue.h>
*/
if (!mutex_trylock(&pool->manager_arb))
return false;
+ pool->manager = worker;
maybe_create_worker(pool);
+ pool->manager = NULL;
mutex_unlock(&pool->manager_arb);
return true;
}
struct wq_barrier {
struct work_struct work;
struct completion done;
+ struct task_struct *task; /* purely informational */
};
static void wq_barrier_func(struct work_struct *work)
INIT_WORK_ONSTACK(&barr->work, wq_barrier_func);
__set_bit(WORK_STRUCT_PENDING_BIT, work_data_bits(&barr->work));
init_completion(&barr->done);
+ barr->task = current;
/*
* If @target is currently being executed, schedule the
}
EXPORT_SYMBOL_GPL(flush_work);
+struct cwt_wait {
+ wait_queue_t wait;
+ struct work_struct *work;
+};
+
+static int cwt_wakefn(wait_queue_t *wait, unsigned mode, int sync, void *key)
+{
+ struct cwt_wait *cwait = container_of(wait, struct cwt_wait, wait);
+
+ if (cwait->work != key)
+ return 0;
+ return autoremove_wake_function(wait, mode, sync, key);
+}
+
static bool __cancel_work_timer(struct work_struct *work, bool is_dwork)
{
+ static DECLARE_WAIT_QUEUE_HEAD(cancel_waitq);
unsigned long flags;
int ret;
do {
ret = try_to_grab_pending(work, is_dwork, &flags);
/*
- * If someone else is canceling, wait for the same event it
- * would be waiting for before retrying.
+ * If someone else is already canceling, wait for it to
+ * finish. flush_work() doesn't work for PREEMPT_NONE
+ * because we may get scheduled between @work's completion
+ * and the other canceling task resuming and clearing
+ * CANCELING - flush_work() will return false immediately
+ * as @work is no longer busy, try_to_grab_pending() will
+ * return -ENOENT as @work is still being canceled and the
+ * other canceling task won't be able to clear CANCELING as
+ * we're hogging the CPU.
+ *
+ * Let's wait for completion using a waitqueue. As this
+ * may lead to the thundering herd problem, use a custom
+ * wake function which matches @work along with exclusive
+ * wait and wakeup.
*/
- if (unlikely(ret == -ENOENT))
- flush_work(work);
+ if (unlikely(ret == -ENOENT)) {
+ struct cwt_wait cwait;
+
+ init_wait(&cwait.wait);
+ cwait.wait.func = cwt_wakefn;
+ cwait.work = work;
+
+ prepare_to_wait_exclusive(&cancel_waitq, &cwait.wait,
+ TASK_UNINTERRUPTIBLE);
+ if (work_is_canceling(work))
+ schedule();
+ finish_wait(&cancel_waitq, &cwait.wait);
+ }
} while (unlikely(ret < 0));
/* tell other tasks trying to grab @work to back off */
flush_work(work);
clear_work_data(work);
+
+ /*
+ * Paired with prepare_to_wait() above so that either
+ * waitqueue_active() is visible here or !work_is_canceling() is
+ * visible there.
+ */
+ smp_mb();
+ if (waitqueue_active(&cancel_waitq))
+ __wake_up(&cancel_waitq, TASK_NORMAL, 1, work);
+
return ret;
}
}
EXPORT_SYMBOL_GPL(execute_in_process_context);
-#ifdef CONFIG_SYSFS
-/*
- * Workqueues with WQ_SYSFS flag set is visible to userland via
- * /sys/bus/workqueue/devices/WQ_NAME. All visible workqueues have the
- * following attributes.
- *
- * per_cpu RO bool : whether the workqueue is per-cpu or unbound
- * max_active RW int : maximum number of in-flight work items
- *
- * Unbound workqueues have the following extra attributes.
+/**
+ * free_workqueue_attrs - free a workqueue_attrs
+ * @attrs: workqueue_attrs to free
*
- * id RO int : the associated pool ID
- * nice RW int : nice value of the workers
- * cpumask RW mask : bitmask of allowed CPUs for the workers
+ * Undo alloc_workqueue_attrs().
*/
-struct wq_device {
- struct workqueue_struct *wq;
- struct device dev;
-};
-
-static struct workqueue_struct *dev_to_wq(struct device *dev)
+void free_workqueue_attrs(struct workqueue_attrs *attrs)
{
- struct wq_device *wq_dev = container_of(dev, struct wq_device, dev);
-
- return wq_dev->wq;
+ if (attrs) {
+ free_cpumask_var(attrs->cpumask);
+ kfree(attrs);
+ }
}
-static ssize_t per_cpu_show(struct device *dev, struct device_attribute *attr,
- char *buf)
+/**
+ * alloc_workqueue_attrs - allocate a workqueue_attrs
+ * @gfp_mask: allocation mask to use
+ *
+ * Allocate a new workqueue_attrs, initialize with default settings and
+ * return it.
+ *
+ * Return: The allocated new workqueue_attr on success. %NULL on failure.
+ */
+struct workqueue_attrs *alloc_workqueue_attrs(gfp_t gfp_mask)
{
- struct workqueue_struct *wq = dev_to_wq(dev);
+ struct workqueue_attrs *attrs;
- return scnprintf(buf, PAGE_SIZE, "%d\n", (bool)!(wq->flags & WQ_UNBOUND));
+ attrs = kzalloc(sizeof(*attrs), gfp_mask);
+ if (!attrs)
+ goto fail;
+ if (!alloc_cpumask_var(&attrs->cpumask, gfp_mask))
+ goto fail;
+
+ cpumask_copy(attrs->cpumask, cpu_possible_mask);
+ return attrs;
+fail:
+ free_workqueue_attrs(attrs);
+ return NULL;
}
-static DEVICE_ATTR_RO(per_cpu);
-static ssize_t max_active_show(struct device *dev,
- struct device_attribute *attr, char *buf)
+static void copy_workqueue_attrs(struct workqueue_attrs *to,
+ const struct workqueue_attrs *from)
{
- struct workqueue_struct *wq = dev_to_wq(dev);
-
- return scnprintf(buf, PAGE_SIZE, "%d\n", wq->saved_max_active);
+ to->nice = from->nice;
+ cpumask_copy(to->cpumask, from->cpumask);
+ /*
+ * Unlike hash and equality test, this function doesn't ignore
+ * ->no_numa as it is used for both pool and wq attrs. Instead,
+ * get_unbound_pool() explicitly clears ->no_numa after copying.
+ */
+ to->no_numa = from->no_numa;
}
-static ssize_t max_active_store(struct device *dev,
- struct device_attribute *attr, const char *buf,
- size_t count)
+/* hash value of the content of @attr */
+static u32 wqattrs_hash(const struct workqueue_attrs *attrs)
{
- struct workqueue_struct *wq = dev_to_wq(dev);
- int val;
-
- if (sscanf(buf, "%d", &val) != 1 || val <= 0)
- return -EINVAL;
+ u32 hash = 0;
- workqueue_set_max_active(wq, val);
- return count;
+ hash = jhash_1word(attrs->nice, hash);
+ hash = jhash(cpumask_bits(attrs->cpumask),
+ BITS_TO_LONGS(nr_cpumask_bits) * sizeof(long), hash);
+ return hash;
}
-static DEVICE_ATTR_RW(max_active);
-
-static struct attribute *wq_sysfs_attrs[] = {
- &dev_attr_per_cpu.attr,
- &dev_attr_max_active.attr,
- NULL,
-};
-ATTRIBUTE_GROUPS(wq_sysfs);
-static ssize_t wq_pool_ids_show(struct device *dev,
- struct device_attribute *attr, char *buf)
+/* content equality test */
+static bool wqattrs_equal(const struct workqueue_attrs *a,
+ const struct workqueue_attrs *b)
{
- struct workqueue_struct *wq = dev_to_wq(dev);
- const char *delim = "";
- int node, written = 0;
-
- rcu_read_lock_sched();
- for_each_node(node) {
- written += scnprintf(buf + written, PAGE_SIZE - written,
- "%s%d:%d", delim, node,
- unbound_pwq_by_node(wq, node)->pool->id);
- delim = " ";
- }
- written += scnprintf(buf + written, PAGE_SIZE - written, "\n");
- rcu_read_unlock_sched();
-
- return written;
+ if (a->nice != b->nice)
+ return false;
+ if (!cpumask_equal(a->cpumask, b->cpumask))
+ return false;
+ return true;
}
-static ssize_t wq_nice_show(struct device *dev, struct device_attribute *attr,
- char *buf)
+/**
+ * init_worker_pool - initialize a newly zalloc'd worker_pool
+ * @pool: worker_pool to initialize
+ *
+ * Initiailize a newly zalloc'd @pool. It also allocates @pool->attrs.
+ *
+ * Return: 0 on success, -errno on failure. Even on failure, all fields
+ * inside @pool proper are initialized and put_unbound_pool() can be called
+ * on @pool safely to release it.
+ */
+static int init_worker_pool(struct worker_pool *pool)
{
- struct workqueue_struct *wq = dev_to_wq(dev);
- int written;
+ spin_lock_init(&pool->lock);
+ pool->id = -1;
+ pool->cpu = -1;
+ pool->node = NUMA_NO_NODE;
+ pool->flags |= POOL_DISASSOCIATED;
+ INIT_LIST_HEAD(&pool->worklist);
+ INIT_LIST_HEAD(&pool->idle_list);
+ hash_init(pool->busy_hash);
- mutex_lock(&wq->mutex);
- written = scnprintf(buf, PAGE_SIZE, "%d\n", wq->unbound_attrs->nice);
- mutex_unlock(&wq->mutex);
+ init_timer_deferrable(&pool->idle_timer);
+ pool->idle_timer.function = idle_worker_timeout;
+ pool->idle_timer.data = (unsigned long)pool;
- return written;
-}
+ setup_timer(&pool->mayday_timer, pool_mayday_timeout,
+ (unsigned long)pool);
-/* prepare workqueue_attrs for sysfs store operations */
-static struct workqueue_attrs *wq_sysfs_prep_attrs(struct workqueue_struct *wq)
-{
- struct workqueue_attrs *attrs;
+ mutex_init(&pool->manager_arb);
+ mutex_init(&pool->attach_mutex);
+ INIT_LIST_HEAD(&pool->workers);
- attrs = alloc_workqueue_attrs(GFP_KERNEL);
- if (!attrs)
- return NULL;
+ ida_init(&pool->worker_ida);
+ INIT_HLIST_NODE(&pool->hash_node);
+ pool->refcnt = 1;
- mutex_lock(&wq->mutex);
- copy_workqueue_attrs(attrs, wq->unbound_attrs);
- mutex_unlock(&wq->mutex);
- return attrs;
+ /* shouldn't fail above this point */
+ pool->attrs = alloc_workqueue_attrs(GFP_KERNEL);
+ if (!pool->attrs)
+ return -ENOMEM;
+ return 0;
}
-static ssize_t wq_nice_store(struct device *dev, struct device_attribute *attr,
- const char *buf, size_t count)
+static void rcu_free_wq(struct rcu_head *rcu)
{
- struct workqueue_struct *wq = dev_to_wq(dev);
- struct workqueue_attrs *attrs;
- int ret;
-
- attrs = wq_sysfs_prep_attrs(wq);
- if (!attrs)
- return -ENOMEM;
+ struct workqueue_struct *wq =
+ container_of(rcu, struct workqueue_struct, rcu);
- if (sscanf(buf, "%d", &attrs->nice) == 1 &&
- attrs->nice >= MIN_NICE && attrs->nice <= MAX_NICE)
- ret = apply_workqueue_attrs(wq, attrs);
+ if (!(wq->flags & WQ_UNBOUND))
+ free_percpu(wq->cpu_pwqs);
else
- ret = -EINVAL;
+ free_workqueue_attrs(wq->unbound_attrs);
- free_workqueue_attrs(attrs);
- return ret ?: count;
+ kfree(wq->rescuer);
+ kfree(wq);
}
-static ssize_t wq_cpumask_show(struct device *dev,
- struct device_attribute *attr, char *buf)
+static void rcu_free_pool(struct rcu_head *rcu)
{
- struct workqueue_struct *wq = dev_to_wq(dev);
- int written;
+ struct worker_pool *pool = container_of(rcu, struct worker_pool, rcu);
- mutex_lock(&wq->mutex);
- written = scnprintf(buf, PAGE_SIZE, "%*pb\n",
- cpumask_pr_args(wq->unbound_attrs->cpumask));
- mutex_unlock(&wq->mutex);
- return written;
+ ida_destroy(&pool->worker_ida);
+ free_workqueue_attrs(pool->attrs);
+ kfree(pool);
}
-static ssize_t wq_cpumask_store(struct device *dev,
- struct device_attribute *attr,
- const char *buf, size_t count)
+/**
+ * put_unbound_pool - put a worker_pool
+ * @pool: worker_pool to put
+ *
+ * Put @pool. If its refcnt reaches zero, it gets destroyed in sched-RCU
+ * safe manner. get_unbound_pool() calls this function on its failure path
+ * and this function should be able to release pools which went through,
+ * successfully or not, init_worker_pool().
+ *
+ * Should be called with wq_pool_mutex held.
+ */
+static void put_unbound_pool(struct worker_pool *pool)
{
- struct workqueue_struct *wq = dev_to_wq(dev);
- struct workqueue_attrs *attrs;
- int ret;
+ DECLARE_COMPLETION_ONSTACK(detach_completion);
+ struct worker *worker;
- attrs = wq_sysfs_prep_attrs(wq);
- if (!attrs)
- return -ENOMEM;
+ lockdep_assert_held(&wq_pool_mutex);
- ret = cpumask_parse(buf, attrs->cpumask);
- if (!ret)
- ret = apply_workqueue_attrs(wq, attrs);
-
- free_workqueue_attrs(attrs);
- return ret ?: count;
-}
-
-static ssize_t wq_numa_show(struct device *dev, struct device_attribute *attr,
- char *buf)
-{
- struct workqueue_struct *wq = dev_to_wq(dev);
- int written;
-
- mutex_lock(&wq->mutex);
- written = scnprintf(buf, PAGE_SIZE, "%d\n",
- !wq->unbound_attrs->no_numa);
- mutex_unlock(&wq->mutex);
-
- return written;
-}
+ if (--pool->refcnt)
+ return;
-static ssize_t wq_numa_store(struct device *dev, struct device_attribute *attr,
- const char *buf, size_t count)
-{
- struct workqueue_struct *wq = dev_to_wq(dev);
- struct workqueue_attrs *attrs;
- int v, ret;
+ /* sanity checks */
+ if (WARN_ON(!(pool->cpu < 0)) ||
+ WARN_ON(!list_empty(&pool->worklist)))
+ return;
- attrs = wq_sysfs_prep_attrs(wq);
- if (!attrs)
- return -ENOMEM;
+ /* release id and unhash */
+ if (pool->id >= 0)
+ idr_remove(&worker_pool_idr, pool->id);
+ hash_del(&pool->hash_node);
- ret = -EINVAL;
- if (sscanf(buf, "%d", &v) == 1) {
- attrs->no_numa = !v;
- ret = apply_workqueue_attrs(wq, attrs);
- }
+ /*
+ * Become the manager and destroy all workers. Grabbing
+ * manager_arb prevents @pool's workers from blocking on
+ * attach_mutex.
+ */
+ mutex_lock(&pool->manager_arb);
- free_workqueue_attrs(attrs);
- return ret ?: count;
-}
+ spin_lock_irq(&pool->lock);
+ while ((worker = first_idle_worker(pool)))
+ destroy_worker(worker);
+ WARN_ON(pool->nr_workers || pool->nr_idle);
+ spin_unlock_irq(&pool->lock);
-static struct device_attribute wq_sysfs_unbound_attrs[] = {
- __ATTR(pool_ids, 0444, wq_pool_ids_show, NULL),
- __ATTR(nice, 0644, wq_nice_show, wq_nice_store),
- __ATTR(cpumask, 0644, wq_cpumask_show, wq_cpumask_store),
- __ATTR(numa, 0644, wq_numa_show, wq_numa_store),
- __ATTR_NULL,
-};
+ mutex_lock(&pool->attach_mutex);
+ if (!list_empty(&pool->workers))
+ pool->detach_completion = &detach_completion;
+ mutex_unlock(&pool->attach_mutex);
-static struct bus_type wq_subsys = {
- .name = "workqueue",
- .dev_groups = wq_sysfs_groups,
-};
+ if (pool->detach_completion)
+ wait_for_completion(pool->detach_completion);
-static int __init wq_sysfs_init(void)
-{
- return subsys_virtual_register(&wq_subsys, NULL);
-}
-core_initcall(wq_sysfs_init);
+ mutex_unlock(&pool->manager_arb);
-static void wq_device_release(struct device *dev)
-{
- struct wq_device *wq_dev = container_of(dev, struct wq_device, dev);
+ /* shut down the timers */
+ del_timer_sync(&pool->idle_timer);
+ del_timer_sync(&pool->mayday_timer);
- kfree(wq_dev);
+ /* sched-RCU protected to allow dereferences from get_work_pool() */
+ call_rcu_sched(&pool->rcu, rcu_free_pool);
}
/**
- * workqueue_sysfs_register - make a workqueue visible in sysfs
- * @wq: the workqueue to register
+ * get_unbound_pool - get a worker_pool with the specified attributes
+ * @attrs: the attributes of the worker_pool to get
*
- * Expose @wq in sysfs under /sys/bus/workqueue/devices.
- * alloc_workqueue*() automatically calls this function if WQ_SYSFS is set
- * which is the preferred method.
+ * Obtain a worker_pool which has the same attributes as @attrs, bump the
+ * reference count and return it. If there already is a matching
+ * worker_pool, it will be used; otherwise, this function attempts to
+ * create a new one.
*
- * Workqueue user should use this function directly iff it wants to apply
- * workqueue_attrs before making the workqueue visible in sysfs; otherwise,
- * apply_workqueue_attrs() may race against userland updating the
- * attributes.
+ * Should be called with wq_pool_mutex held.
*
- * Return: 0 on success, -errno on failure.
+ * Return: On success, a worker_pool with the same attributes as @attrs.
+ * On failure, %NULL.
*/
-int workqueue_sysfs_register(struct workqueue_struct *wq)
+static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
{
- struct wq_device *wq_dev;
- int ret;
+ u32 hash = wqattrs_hash(attrs);
+ struct worker_pool *pool;
+ int node;
- /*
- * Adjusting max_active or creating new pwqs by applyting
- * attributes breaks ordering guarantee. Disallow exposing ordered
- * workqueues.
- */
- if (WARN_ON(wq->flags & __WQ_ORDERED))
- return -EINVAL;
+ lockdep_assert_held(&wq_pool_mutex);
- wq->wq_dev = wq_dev = kzalloc(sizeof(*wq_dev), GFP_KERNEL);
- if (!wq_dev)
- return -ENOMEM;
+ /* do we already have a matching pool? */
+ hash_for_each_possible(unbound_pool_hash, pool, hash_node, hash) {
+ if (wqattrs_equal(pool->attrs, attrs)) {
+ pool->refcnt++;
+ return pool;
+ }
+ }
- wq_dev->wq = wq;
- wq_dev->dev.bus = &wq_subsys;
- wq_dev->dev.init_name = wq->name;
- wq_dev->dev.release = wq_device_release;
+ /* nope, create a new one */
+ pool = kzalloc(sizeof(*pool), GFP_KERNEL);
+ if (!pool || init_worker_pool(pool) < 0)
+ goto fail;
+
+ lockdep_set_subclass(&pool->lock, 1); /* see put_pwq() */
+ copy_workqueue_attrs(pool->attrs, attrs);
/*
- * unbound_attrs are created separately. Suppress uevent until
- * everything is ready.
+ * no_numa isn't a worker_pool attribute, always clear it. See
+ * 'struct workqueue_attrs' comments for detail.
*/
- dev_set_uevent_suppress(&wq_dev->dev, true);
-
- ret = device_register(&wq_dev->dev);
- if (ret) {
- kfree(wq_dev);
- wq->wq_dev = NULL;
- return ret;
- }
-
- if (wq->flags & WQ_UNBOUND) {
- struct device_attribute *attr;
+ pool->attrs->no_numa = false;
- for (attr = wq_sysfs_unbound_attrs; attr->attr.name; attr++) {
- ret = device_create_file(&wq_dev->dev, attr);
- if (ret) {
- device_unregister(&wq_dev->dev);
- wq->wq_dev = NULL;
- return ret;
+ /* if cpumask is contained inside a NUMA node, we belong to that node */
+ if (wq_numa_enabled) {
+ for_each_node(node) {
+ if (cpumask_subset(pool->attrs->cpumask,
+ wq_numa_possible_cpumask[node])) {
+ pool->node = node;
+ break;
}
}
}
- dev_set_uevent_suppress(&wq_dev->dev, false);
- kobject_uevent(&wq_dev->dev.kobj, KOBJ_ADD);
- return 0;
+ if (worker_pool_assign_id(pool) < 0)
+ goto fail;
+
+ /* create and start the initial worker */
+ if (!create_worker(pool))
+ goto fail;
+
+ /* install */
+ hash_add(unbound_pool_hash, &pool->hash_node, hash);
+
+ return pool;
+fail:
+ if (pool)
+ put_unbound_pool(pool);
+ return NULL;
}
-/**
- * workqueue_sysfs_unregister - undo workqueue_sysfs_register()
- * @wq: the workqueue to unregister
- *
- * If @wq is registered to sysfs by workqueue_sysfs_register(), unregister.
+static void rcu_free_pwq(struct rcu_head *rcu)
+{
+ kmem_cache_free(pwq_cache,
+ container_of(rcu, struct pool_workqueue, rcu));
+}
+
+/*
+ * Scheduled on system_wq by put_pwq() when an unbound pwq hits zero refcnt
+ * and needs to be destroyed.
*/
-static void workqueue_sysfs_unregister(struct workqueue_struct *wq)
+static void pwq_unbound_release_workfn(struct work_struct *work)
{
- struct wq_device *wq_dev = wq->wq_dev;
+ struct pool_workqueue *pwq = container_of(work, struct pool_workqueue,
+ unbound_release_work);
+ struct workqueue_struct *wq = pwq->wq;
+ struct worker_pool *pool = pwq->pool;
+ bool is_last;
- if (!wq->wq_dev)
+ if (WARN_ON_ONCE(!(wq->flags & WQ_UNBOUND)))
return;
- wq->wq_dev = NULL;
- device_unregister(&wq_dev->dev);
+ mutex_lock(&wq->mutex);
+ list_del_rcu(&pwq->pwqs_node);
+ is_last = list_empty(&wq->pwqs);
+ mutex_unlock(&wq->mutex);
+
+ mutex_lock(&wq_pool_mutex);
+ put_unbound_pool(pool);
+ mutex_unlock(&wq_pool_mutex);
+
+ call_rcu_sched(&pwq->rcu, rcu_free_pwq);
+
+ /*
+ * If we're the last pwq going away, @wq is already dead and no one
+ * is gonna access it anymore. Schedule RCU free.
+ */
+ if (is_last)
+ call_rcu_sched(&wq->rcu, rcu_free_wq);
}
-#else /* CONFIG_SYSFS */
-static void workqueue_sysfs_unregister(struct workqueue_struct *wq) { }
-#endif /* CONFIG_SYSFS */
/**
- * free_workqueue_attrs - free a workqueue_attrs
- * @attrs: workqueue_attrs to free
+ * pwq_adjust_max_active - update a pwq's max_active to the current setting
+ * @pwq: target pool_workqueue
*
- * Undo alloc_workqueue_attrs().
+ * If @pwq isn't freezing, set @pwq->max_active to the associated
+ * workqueue's saved_max_active and activate delayed work items
+ * accordingly. If @pwq is freezing, clear @pwq->max_active to zero.
*/
-void free_workqueue_attrs(struct workqueue_attrs *attrs)
+static void pwq_adjust_max_active(struct pool_workqueue *pwq)
{
- if (attrs) {
- free_cpumask_var(attrs->cpumask);
- kfree(attrs);
- }
-}
+ struct workqueue_struct *wq = pwq->wq;
+ bool freezable = wq->flags & WQ_FREEZABLE;
-/**
- * alloc_workqueue_attrs - allocate a workqueue_attrs
- * @gfp_mask: allocation mask to use
- *
- * Allocate a new workqueue_attrs, initialize with default settings and
- * return it.
- *
- * Return: The allocated new workqueue_attr on success. %NULL on failure.
- */
-struct workqueue_attrs *alloc_workqueue_attrs(gfp_t gfp_mask)
-{
- struct workqueue_attrs *attrs;
+ /* for @wq->saved_max_active */
+ lockdep_assert_held(&wq->mutex);
- attrs = kzalloc(sizeof(*attrs), gfp_mask);
- if (!attrs)
- goto fail;
- if (!alloc_cpumask_var(&attrs->cpumask, gfp_mask))
- goto fail;
+ /* fast exit for non-freezable wqs */
+ if (!freezable && pwq->max_active == wq->saved_max_active)
+ return;
- cpumask_copy(attrs->cpumask, cpu_possible_mask);
- return attrs;
-fail:
- free_workqueue_attrs(attrs);
- return NULL;
-}
+ spin_lock_irq(&pwq->pool->lock);
-static void copy_workqueue_attrs(struct workqueue_attrs *to,
- const struct workqueue_attrs *from)
-{
- to->nice = from->nice;
- cpumask_copy(to->cpumask, from->cpumask);
/*
- * Unlike hash and equality test, this function doesn't ignore
- * ->no_numa as it is used for both pool and wq attrs. Instead,
- * get_unbound_pool() explicitly clears ->no_numa after copying.
+ * During [un]freezing, the caller is responsible for ensuring that
+ * this function is called at least once after @workqueue_freezing
+ * is updated and visible.
*/
- to->no_numa = from->no_numa;
+ if (!freezable || !workqueue_freezing) {
+ pwq->max_active = wq->saved_max_active;
+
+ while (!list_empty(&pwq->delayed_works) &&
+ pwq->nr_active < pwq->max_active)
+ pwq_activate_first_delayed(pwq);
+
+ /*
+ * Need to kick a worker after thawed or an unbound wq's
+ * max_active is bumped. It's a slow path. Do it always.
+ */
+ wake_up_worker(pwq->pool);
+ } else {
+ pwq->max_active = 0;
+ }
+
+ spin_unlock_irq(&pwq->pool->lock);
}
-/* hash value of the content of @attr */
-static u32 wqattrs_hash(const struct workqueue_attrs *attrs)
+/* initialize newly alloced @pwq which is associated with @wq and @pool */
+static void init_pwq(struct pool_workqueue *pwq, struct workqueue_struct *wq,
+ struct worker_pool *pool)
{
- u32 hash = 0;
+ BUG_ON((unsigned long)pwq & WORK_STRUCT_FLAG_MASK);
- hash = jhash_1word(attrs->nice, hash);
- hash = jhash(cpumask_bits(attrs->cpumask),
- BITS_TO_LONGS(nr_cpumask_bits) * sizeof(long), hash);
- return hash;
+ memset(pwq, 0, sizeof(*pwq));
+
+ pwq->pool = pool;
+ pwq->wq = wq;
+ pwq->flush_color = -1;
+ pwq->refcnt = 1;
+ INIT_LIST_HEAD(&pwq->delayed_works);
+ INIT_LIST_HEAD(&pwq->pwqs_node);
+ INIT_LIST_HEAD(&pwq->mayday_node);
+ INIT_WORK(&pwq->unbound_release_work, pwq_unbound_release_workfn);
}
-/* content equality test */
-static bool wqattrs_equal(const struct workqueue_attrs *a,
- const struct workqueue_attrs *b)
+/* sync @pwq with the current state of its associated wq and link it */
+static void link_pwq(struct pool_workqueue *pwq)
{
- if (a->nice != b->nice)
- return false;
- if (!cpumask_equal(a->cpumask, b->cpumask))
- return false;
- return true;
+ struct workqueue_struct *wq = pwq->wq;
+
+ lockdep_assert_held(&wq->mutex);
+
+ /* may be called multiple times, ignore if already linked */
+ if (!list_empty(&pwq->pwqs_node))
+ return;
+
+ /* set the matching work_color */
+ pwq->work_color = wq->work_color;
+
+ /* sync max_active to the current setting */
+ pwq_adjust_max_active(pwq);
+
+ /* link in @pwq */
+ list_add_rcu(&pwq->pwqs_node, &wq->pwqs);
}
-/**
- * init_worker_pool - initialize a newly zalloc'd worker_pool
- * @pool: worker_pool to initialize
- *
- * Initiailize a newly zalloc'd @pool. It also allocates @pool->attrs.
- *
- * Return: 0 on success, -errno on failure. Even on failure, all fields
- * inside @pool proper are initialized and put_unbound_pool() can be called
- * on @pool safely to release it.
- */
-static int init_worker_pool(struct worker_pool *pool)
+/* obtain a pool matching @attr and create a pwq associating the pool and @wq */
+static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
+ const struct workqueue_attrs *attrs)
{
- spin_lock_init(&pool->lock);
- pool->id = -1;
- pool->cpu = -1;
- pool->node = NUMA_NO_NODE;
- pool->flags |= POOL_DISASSOCIATED;
- INIT_LIST_HEAD(&pool->worklist);
- INIT_LIST_HEAD(&pool->idle_list);
- hash_init(pool->busy_hash);
-
- init_timer_deferrable(&pool->idle_timer);
- pool->idle_timer.function = idle_worker_timeout;
- pool->idle_timer.data = (unsigned long)pool;
+ struct worker_pool *pool;
+ struct pool_workqueue *pwq;
- setup_timer(&pool->mayday_timer, pool_mayday_timeout,
- (unsigned long)pool);
+ lockdep_assert_held(&wq_pool_mutex);
- mutex_init(&pool->manager_arb);
- mutex_init(&pool->attach_mutex);
- INIT_LIST_HEAD(&pool->workers);
+ pool = get_unbound_pool(attrs);
+ if (!pool)
+ return NULL;
- ida_init(&pool->worker_ida);
- INIT_HLIST_NODE(&pool->hash_node);
- pool->refcnt = 1;
+ pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node);
+ if (!pwq) {
+ put_unbound_pool(pool);
+ return NULL;
+ }
- /* shouldn't fail above this point */
- pool->attrs = alloc_workqueue_attrs(GFP_KERNEL);
- if (!pool->attrs)
- return -ENOMEM;
- return 0;
+ init_pwq(pwq, wq, pool);
+ return pwq;
}
-static void rcu_free_pool(struct rcu_head *rcu)
+/* undo alloc_unbound_pwq(), used only in the error path */
+static void free_unbound_pwq(struct pool_workqueue *pwq)
{
- struct worker_pool *pool = container_of(rcu, struct worker_pool, rcu);
+ lockdep_assert_held(&wq_pool_mutex);
- ida_destroy(&pool->worker_ida);
- free_workqueue_attrs(pool->attrs);
- kfree(pool);
+ if (pwq) {
+ put_unbound_pool(pwq->pool);
+ kmem_cache_free(pwq_cache, pwq);
+ }
}
/**
- * put_unbound_pool - put a worker_pool
- * @pool: worker_pool to put
+ * wq_calc_node_mask - calculate a wq_attrs' cpumask for the specified node
+ * @attrs: the wq_attrs of interest
+ * @node: the target NUMA node
+ * @cpu_going_down: if >= 0, the CPU to consider as offline
+ * @cpumask: outarg, the resulting cpumask
*
- * Put @pool. If its refcnt reaches zero, it gets destroyed in sched-RCU
- * safe manner. get_unbound_pool() calls this function on its failure path
- * and this function should be able to release pools which went through,
- * successfully or not, init_worker_pool().
+ * Calculate the cpumask a workqueue with @attrs should use on @node. If
+ * @cpu_going_down is >= 0, that cpu is considered offline during
+ * calculation. The result is stored in @cpumask.
*
- * Should be called with wq_pool_mutex held.
+ * If NUMA affinity is not enabled, @attrs->cpumask is always used. If
+ * enabled and @node has online CPUs requested by @attrs, the returned
+ * cpumask is the intersection of the possible CPUs of @node and
+ * @attrs->cpumask.
+ *
+ * The caller is responsible for ensuring that the cpumask of @node stays
+ * stable.
+ *
+ * Return: %true if the resulting @cpumask is different from @attrs->cpumask,
+ * %false if equal.
*/
-static void put_unbound_pool(struct worker_pool *pool)
+static bool wq_calc_node_cpumask(const struct workqueue_attrs *attrs, int node,
+ int cpu_going_down, cpumask_t *cpumask)
{
- DECLARE_COMPLETION_ONSTACK(detach_completion);
- struct worker *worker;
-
- lockdep_assert_held(&wq_pool_mutex);
-
- if (--pool->refcnt)
- return;
+ if (!wq_numa_enabled || attrs->no_numa)
+ goto use_dfl;
- /* sanity checks */
- if (WARN_ON(!(pool->cpu < 0)) ||
- WARN_ON(!list_empty(&pool->worklist)))
- return;
+ /* does @node have any online CPUs @attrs wants? */
+ cpumask_and(cpumask, cpumask_of_node(node), attrs->cpumask);
+ if (cpu_going_down >= 0)
+ cpumask_clear_cpu(cpu_going_down, cpumask);
- /* release id and unhash */
- if (pool->id >= 0)
- idr_remove(&worker_pool_idr, pool->id);
- hash_del(&pool->hash_node);
+ if (cpumask_empty(cpumask))
+ goto use_dfl;
- /*
- * Become the manager and destroy all workers. Grabbing
- * manager_arb prevents @pool's workers from blocking on
- * attach_mutex.
- */
- mutex_lock(&pool->manager_arb);
+ /* yeap, return possible CPUs in @node that @attrs wants */
+ cpumask_and(cpumask, attrs->cpumask, wq_numa_possible_cpumask[node]);
+ return !cpumask_equal(cpumask, attrs->cpumask);
- spin_lock_irq(&pool->lock);
- while ((worker = first_idle_worker(pool)))
- destroy_worker(worker);
- WARN_ON(pool->nr_workers || pool->nr_idle);
- spin_unlock_irq(&pool->lock);
+use_dfl:
+ cpumask_copy(cpumask, attrs->cpumask);
+ return false;
+}
- mutex_lock(&pool->attach_mutex);
- if (!list_empty(&pool->workers))
- pool->detach_completion = &detach_completion;
- mutex_unlock(&pool->attach_mutex);
+/* install @pwq into @wq's numa_pwq_tbl[] for @node and return the old pwq */
+static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq,
+ int node,
+ struct pool_workqueue *pwq)
+{
+ struct pool_workqueue *old_pwq;
- if (pool->detach_completion)
- wait_for_completion(pool->detach_completion);
+ lockdep_assert_held(&wq->mutex);
- mutex_unlock(&pool->manager_arb);
+ /* link_pwq() can handle duplicate calls */
+ link_pwq(pwq);
- /* shut down the timers */
- del_timer_sync(&pool->idle_timer);
- del_timer_sync(&pool->mayday_timer);
-
- /* sched-RCU protected to allow dereferences from get_work_pool() */
- call_rcu_sched(&pool->rcu, rcu_free_pool);
-}
+ old_pwq = rcu_access_pointer(wq->numa_pwq_tbl[node]);
+ rcu_assign_pointer(wq->numa_pwq_tbl[node], pwq);
+ return old_pwq;
+}
/**
- * get_unbound_pool - get a worker_pool with the specified attributes
- * @attrs: the attributes of the worker_pool to get
+ * apply_workqueue_attrs - apply new workqueue_attrs to an unbound workqueue
+ * @wq: the target workqueue
+ * @attrs: the workqueue_attrs to apply, allocated with alloc_workqueue_attrs()
*
- * Obtain a worker_pool which has the same attributes as @attrs, bump the
- * reference count and return it. If there already is a matching
- * worker_pool, it will be used; otherwise, this function attempts to
- * create a new one.
+ * Apply @attrs to an unbound workqueue @wq. Unless disabled, on NUMA
+ * machines, this function maps a separate pwq to each NUMA node with
+ * possibles CPUs in @attrs->cpumask so that work items are affine to the
+ * NUMA node it was issued on. Older pwqs are released as in-flight work
+ * items finish. Note that a work item which repeatedly requeues itself
+ * back-to-back will stay on its current pwq.
*
- * Should be called with wq_pool_mutex held.
+ * Performs GFP_KERNEL allocations.
*
- * Return: On success, a worker_pool with the same attributes as @attrs.
- * On failure, %NULL.
+ * Return: 0 on success and -errno on failure.
*/
-static struct worker_pool *get_unbound_pool(const struct workqueue_attrs *attrs)
+int apply_workqueue_attrs(struct workqueue_struct *wq,
+ const struct workqueue_attrs *attrs)
{
- u32 hash = wqattrs_hash(attrs);
- struct worker_pool *pool;
- int node;
+ struct workqueue_attrs *new_attrs, *tmp_attrs;
+ struct pool_workqueue **pwq_tbl, *dfl_pwq;
+ int node, ret;
- lockdep_assert_held(&wq_pool_mutex);
+ /* only unbound workqueues can change attributes */
+ if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
+ return -EINVAL;
- /* do we already have a matching pool? */
- hash_for_each_possible(unbound_pool_hash, pool, hash_node, hash) {
- if (wqattrs_equal(pool->attrs, attrs)) {
- pool->refcnt++;
- return pool;
- }
- }
+ /* creating multiple pwqs breaks ordering guarantee */
+ if (WARN_ON((wq->flags & __WQ_ORDERED) && !list_empty(&wq->pwqs)))
+ return -EINVAL;
- /* nope, create a new one */
- pool = kzalloc(sizeof(*pool), GFP_KERNEL);
- if (!pool || init_worker_pool(pool) < 0)
- goto fail;
+ pwq_tbl = kzalloc(nr_node_ids * sizeof(pwq_tbl[0]), GFP_KERNEL);
+ new_attrs = alloc_workqueue_attrs(GFP_KERNEL);
+ tmp_attrs = alloc_workqueue_attrs(GFP_KERNEL);
+ if (!pwq_tbl || !new_attrs || !tmp_attrs)
+ goto enomem;
- lockdep_set_subclass(&pool->lock, 1); /* see put_pwq() */
- copy_workqueue_attrs(pool->attrs, attrs);
+ /* make a copy of @attrs and sanitize it */
+ copy_workqueue_attrs(new_attrs, attrs);
+ cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);
/*
- * no_numa isn't a worker_pool attribute, always clear it. See
- * 'struct workqueue_attrs' comments for detail.
+ * We may create multiple pwqs with differing cpumasks. Make a
+ * copy of @new_attrs which will be modified and used to obtain
+ * pools.
*/
- pool->attrs->no_numa = false;
+ copy_workqueue_attrs(tmp_attrs, new_attrs);
- /* if cpumask is contained inside a NUMA node, we belong to that node */
- if (wq_numa_enabled) {
- for_each_node(node) {
- if (cpumask_subset(pool->attrs->cpumask,
- wq_numa_possible_cpumask[node])) {
- pool->node = node;
- break;
- }
- }
- }
+ /*
+ * CPUs should stay stable across pwq creations and installations.
+ * Pin CPUs, determine the target cpumask for each node and create
+ * pwqs accordingly.
+ */
+ get_online_cpus();
- if (worker_pool_assign_id(pool) < 0)
- goto fail;
+ mutex_lock(&wq_pool_mutex);
- /* create and start the initial worker */
- if (!create_worker(pool))
- goto fail;
+ /*
+ * If something goes wrong during CPU up/down, we'll fall back to
+ * the default pwq covering whole @attrs->cpumask. Always create
+ * it even if we don't use it immediately.
+ */
+ dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
+ if (!dfl_pwq)
+ goto enomem_pwq;
- /* install */
- hash_add(unbound_pool_hash, &pool->hash_node, hash);
+ for_each_node(node) {
+ if (wq_calc_node_cpumask(attrs, node, -1, tmp_attrs->cpumask)) {
+ pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
+ if (!pwq_tbl[node])
+ goto enomem_pwq;
+ } else {
+ dfl_pwq->refcnt++;
+ pwq_tbl[node] = dfl_pwq;
+ }
+ }
- return pool;
-fail:
- if (pool)
- put_unbound_pool(pool);
- return NULL;
-}
+ mutex_unlock(&wq_pool_mutex);
-static void rcu_free_pwq(struct rcu_head *rcu)
-{
- kmem_cache_free(pwq_cache,
- container_of(rcu, struct pool_workqueue, rcu));
-}
+ /* all pwqs have been created successfully, let's install'em */
+ mutex_lock(&wq->mutex);
-/*
- * Scheduled on system_wq by put_pwq() when an unbound pwq hits zero refcnt
- * and needs to be destroyed.
- */
-static void pwq_unbound_release_workfn(struct work_struct *work)
-{
- struct pool_workqueue *pwq = container_of(work, struct pool_workqueue,
- unbound_release_work);
- struct workqueue_struct *wq = pwq->wq;
- struct worker_pool *pool = pwq->pool;
- bool is_last;
+ copy_workqueue_attrs(wq->unbound_attrs, new_attrs);
- if (WARN_ON_ONCE(!(wq->flags & WQ_UNBOUND)))
- return;
+ /* save the previous pwq and install the new one */
+ for_each_node(node)
+ pwq_tbl[node] = numa_pwq_tbl_install(wq, node, pwq_tbl[node]);
+
+ /* @dfl_pwq might not have been used, ensure it's linked */
+ link_pwq(dfl_pwq);
+ swap(wq->dfl_pwq, dfl_pwq);
- mutex_lock(&wq->mutex);
- list_del_rcu(&pwq->pwqs_node);
- is_last = list_empty(&wq->pwqs);
mutex_unlock(&wq->mutex);
- mutex_lock(&wq_pool_mutex);
- put_unbound_pool(pool);
- mutex_unlock(&wq_pool_mutex);
+ /* put the old pwqs */
+ for_each_node(node)
+ put_pwq_unlocked(pwq_tbl[node]);
+ put_pwq_unlocked(dfl_pwq);
- call_rcu_sched(&pwq->rcu, rcu_free_pwq);
+ put_online_cpus();
+ ret = 0;
+ /* fall through */
+out_free:
+ free_workqueue_attrs(tmp_attrs);
+ free_workqueue_attrs(new_attrs);
+ kfree(pwq_tbl);
+ return ret;
- /*
- * If we're the last pwq going away, @wq is already dead and no one
- * is gonna access it anymore. Free it.
- */
- if (is_last) {
- free_workqueue_attrs(wq->unbound_attrs);
- kfree(wq);
- }
+enomem_pwq:
+ free_unbound_pwq(dfl_pwq);
+ for_each_node(node)
+ if (pwq_tbl && pwq_tbl[node] != dfl_pwq)
+ free_unbound_pwq(pwq_tbl[node]);
+ mutex_unlock(&wq_pool_mutex);
+ put_online_cpus();
+enomem:
+ ret = -ENOMEM;
+ goto out_free;
}
/**
- * pwq_adjust_max_active - update a pwq's max_active to the current setting
- * @pwq: target pool_workqueue
+ * wq_update_unbound_numa - update NUMA affinity of a wq for CPU hot[un]plug
+ * @wq: the target workqueue
+ * @cpu: the CPU coming up or going down
+ * @online: whether @cpu is coming up or going down
*
- * If @pwq isn't freezing, set @pwq->max_active to the associated
- * workqueue's saved_max_active and activate delayed work items
- * accordingly. If @pwq is freezing, clear @pwq->max_active to zero.
+ * This function is to be called from %CPU_DOWN_PREPARE, %CPU_ONLINE and
+ * %CPU_DOWN_FAILED. @cpu is being hot[un]plugged, update NUMA affinity of
+ * @wq accordingly.
+ *
+ * If NUMA affinity can't be adjusted due to memory allocation failure, it
+ * falls back to @wq->dfl_pwq which may not be optimal but is always
+ * correct.
+ *
+ * Note that when the last allowed CPU of a NUMA node goes offline for a
+ * workqueue with a cpumask spanning multiple nodes, the workers which were
+ * already executing the work items for the workqueue will lose their CPU
+ * affinity and may execute on any CPU. This is similar to how per-cpu
+ * workqueues behave on CPU_DOWN. If a workqueue user wants strict
+ * affinity, it's the user's responsibility to flush the work item from
+ * CPU_DOWN_PREPARE.
*/
-static void pwq_adjust_max_active(struct pool_workqueue *pwq)
+static void wq_update_unbound_numa(struct workqueue_struct *wq, int cpu,
+ bool online)
{
- struct workqueue_struct *wq = pwq->wq;
- bool freezable = wq->flags & WQ_FREEZABLE;
+ int node = cpu_to_node(cpu);
+ int cpu_off = online ? -1 : cpu;
+ struct pool_workqueue *old_pwq = NULL, *pwq;
+ struct workqueue_attrs *target_attrs;
+ cpumask_t *cpumask;
- /* for @wq->saved_max_active */
- lockdep_assert_held(&wq->mutex);
+ lockdep_assert_held(&wq_pool_mutex);
- /* fast exit for non-freezable wqs */
- if (!freezable && pwq->max_active == wq->saved_max_active)
+ if (!wq_numa_enabled || !(wq->flags & WQ_UNBOUND))
return;
- spin_lock_irq(&pwq->pool->lock);
-
/*
- * During [un]freezing, the caller is responsible for ensuring that
- * this function is called at least once after @workqueue_freezing
- * is updated and visible.
+ * We don't wanna alloc/free wq_attrs for each wq for each CPU.
+ * Let's use a preallocated one. The following buf is protected by
+ * CPU hotplug exclusion.
*/
- if (!freezable || !workqueue_freezing) {
- pwq->max_active = wq->saved_max_active;
+ target_attrs = wq_update_unbound_numa_attrs_buf;
+ cpumask = target_attrs->cpumask;
- while (!list_empty(&pwq->delayed_works) &&
- pwq->nr_active < pwq->max_active)
- pwq_activate_first_delayed(pwq);
+ mutex_lock(&wq->mutex);
+ if (wq->unbound_attrs->no_numa)
+ goto out_unlock;
- /*
- * Need to kick a worker after thawed or an unbound wq's
- * max_active is bumped. It's a slow path. Do it always.
- */
- wake_up_worker(pwq->pool);
+ copy_workqueue_attrs(target_attrs, wq->unbound_attrs);
+ pwq = unbound_pwq_by_node(wq, node);
+
+ /*
+ * Let's determine what needs to be done. If the target cpumask is
+ * different from wq's, we need to compare it to @pwq's and create
+ * a new one if they don't match. If the target cpumask equals
+ * wq's, the default pwq should be used.
+ */
+ if (wq_calc_node_cpumask(wq->unbound_attrs, node, cpu_off, cpumask)) {
+ if (cpumask_equal(cpumask, pwq->pool->attrs->cpumask))
+ goto out_unlock;
} else {
- pwq->max_active = 0;
+ goto use_dfl_pwq;
}
- spin_unlock_irq(&pwq->pool->lock);
-}
+ mutex_unlock(&wq->mutex);
-/* initialize newly alloced @pwq which is associated with @wq and @pool */
-static void init_pwq(struct pool_workqueue *pwq, struct workqueue_struct *wq,
- struct worker_pool *pool)
-{
- BUG_ON((unsigned long)pwq & WORK_STRUCT_FLAG_MASK);
+ /* create a new pwq */
+ pwq = alloc_unbound_pwq(wq, target_attrs);
+ if (!pwq) {
+ pr_warn("workqueue: allocation failed while updating NUMA affinity of \"%s\"\n",
+ wq->name);
+ mutex_lock(&wq->mutex);
+ goto use_dfl_pwq;
+ }
- memset(pwq, 0, sizeof(*pwq));
+ /*
+ * Install the new pwq. As this function is called only from CPU
+ * hotplug callbacks and applying a new attrs is wrapped with
+ * get/put_online_cpus(), @wq->unbound_attrs couldn't have changed
+ * inbetween.
+ */
+ mutex_lock(&wq->mutex);
+ old_pwq = numa_pwq_tbl_install(wq, node, pwq);
+ goto out_unlock;
- pwq->pool = pool;
- pwq->wq = wq;
- pwq->flush_color = -1;
- pwq->refcnt = 1;
- INIT_LIST_HEAD(&pwq->delayed_works);
- INIT_LIST_HEAD(&pwq->pwqs_node);
- INIT_LIST_HEAD(&pwq->mayday_node);
- INIT_WORK(&pwq->unbound_release_work, pwq_unbound_release_workfn);
+use_dfl_pwq:
+ spin_lock_irq(&wq->dfl_pwq->pool->lock);
+ get_pwq(wq->dfl_pwq);
+ spin_unlock_irq(&wq->dfl_pwq->pool->lock);
+ old_pwq = numa_pwq_tbl_install(wq, node, wq->dfl_pwq);
+out_unlock:
+ mutex_unlock(&wq->mutex);
+ put_pwq_unlocked(old_pwq);
}
-/* sync @pwq with the current state of its associated wq and link it */
-static void link_pwq(struct pool_workqueue *pwq)
+static int alloc_and_link_pwqs(struct workqueue_struct *wq)
{
- struct workqueue_struct *wq = pwq->wq;
+ bool highpri = wq->flags & WQ_HIGHPRI;
+ int cpu, ret;
- lockdep_assert_held(&wq->mutex);
+ if (!(wq->flags & WQ_UNBOUND)) {
+ wq->cpu_pwqs = alloc_percpu(struct pool_workqueue);
+ if (!wq->cpu_pwqs)
+ return -ENOMEM;
- /* may be called multiple times, ignore if already linked */
- if (!list_empty(&pwq->pwqs_node))
- return;
+ for_each_possible_cpu(cpu) {
+ struct pool_workqueue *pwq =
+ per_cpu_ptr(wq->cpu_pwqs, cpu);
+ struct worker_pool *cpu_pools =
+ per_cpu(cpu_worker_pools, cpu);
- /* set the matching work_color */
- pwq->work_color = wq->work_color;
+ init_pwq(pwq, wq, &cpu_pools[highpri]);
- /* sync max_active to the current setting */
- pwq_adjust_max_active(pwq);
+ mutex_lock(&wq->mutex);
+ link_pwq(pwq);
+ mutex_unlock(&wq->mutex);
+ }
+ return 0;
+ } else if (wq->flags & __WQ_ORDERED) {
+ ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]);
+ /* there should only be single pwq for ordering guarantee */
+ WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node ||
+ wq->pwqs.prev != &wq->dfl_pwq->pwqs_node),
+ "ordering guarantee broken for workqueue %s\n", wq->name);
+ return ret;
+ } else {
+ return apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]);
+ }
+}
- /* link in @pwq */
- list_add_rcu(&pwq->pwqs_node, &wq->pwqs);
+static int wq_clamp_max_active(int max_active, unsigned int flags,
+ const char *name)
+{
+ int lim = flags & WQ_UNBOUND ? WQ_UNBOUND_MAX_ACTIVE : WQ_MAX_ACTIVE;
+
+ if (max_active < 1 || max_active > lim)
+ pr_warn("workqueue: max_active %d requested for %s is out of range, clamping between %d and %d\n",
+ max_active, name, 1, lim);
+
+ return clamp_val(max_active, 1, lim);
}
-/* obtain a pool matching @attr and create a pwq associating the pool and @wq */
-static struct pool_workqueue *alloc_unbound_pwq(struct workqueue_struct *wq,
- const struct workqueue_attrs *attrs)
+struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
+ unsigned int flags,
+ int max_active,
+ struct lock_class_key *key,
+ const char *lock_name, ...)
{
- struct worker_pool *pool;
+ size_t tbl_size = 0;
+ va_list args;
+ struct workqueue_struct *wq;
struct pool_workqueue *pwq;
- lockdep_assert_held(&wq_pool_mutex);
+ /* see the comment above the definition of WQ_POWER_EFFICIENT */
+ if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient)
+ flags |= WQ_UNBOUND;
- pool = get_unbound_pool(attrs);
- if (!pool)
- return NULL;
+ /* allocate wq and format name */
+ if (flags & WQ_UNBOUND)
+ tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]);
- pwq = kmem_cache_alloc_node(pwq_cache, GFP_KERNEL, pool->node);
- if (!pwq) {
- put_unbound_pool(pool);
+ wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL);
+ if (!wq)
return NULL;
+
+ if (flags & WQ_UNBOUND) {
+ wq->unbound_attrs = alloc_workqueue_attrs(GFP_KERNEL);
+ if (!wq->unbound_attrs)
+ goto err_free_wq;
}
- init_pwq(pwq, wq, pool);
- return pwq;
-}
+ va_start(args, lock_name);
+ vsnprintf(wq->name, sizeof(wq->name), fmt, args);
+ va_end(args);
-/* undo alloc_unbound_pwq(), used only in the error path */
-static void free_unbound_pwq(struct pool_workqueue *pwq)
-{
- lockdep_assert_held(&wq_pool_mutex);
+ max_active = max_active ?: WQ_DFL_ACTIVE;
+ max_active = wq_clamp_max_active(max_active, flags, wq->name);
- if (pwq) {
- put_unbound_pool(pwq->pool);
- kmem_cache_free(pwq_cache, pwq);
- }
-}
+ /* init wq */
+ wq->flags = flags;
+ wq->saved_max_active = max_active;
+ mutex_init(&wq->mutex);
+ atomic_set(&wq->nr_pwqs_to_flush, 0);
+ INIT_LIST_HEAD(&wq->pwqs);
+ INIT_LIST_HEAD(&wq->flusher_queue);
+ INIT_LIST_HEAD(&wq->flusher_overflow);
+ INIT_LIST_HEAD(&wq->maydays);
-/**
- * wq_calc_node_mask - calculate a wq_attrs' cpumask for the specified node
- * @attrs: the wq_attrs of interest
- * @node: the target NUMA node
- * @cpu_going_down: if >= 0, the CPU to consider as offline
- * @cpumask: outarg, the resulting cpumask
- *
- * Calculate the cpumask a workqueue with @attrs should use on @node. If
- * @cpu_going_down is >= 0, that cpu is considered offline during
- * calculation. The result is stored in @cpumask.
- *
- * If NUMA affinity is not enabled, @attrs->cpumask is always used. If
- * enabled and @node has online CPUs requested by @attrs, the returned
- * cpumask is the intersection of the possible CPUs of @node and
- * @attrs->cpumask.
- *
- * The caller is responsible for ensuring that the cpumask of @node stays
- * stable.
- *
- * Return: %true if the resulting @cpumask is different from @attrs->cpumask,
- * %false if equal.
- */
-static bool wq_calc_node_cpumask(const struct workqueue_attrs *attrs, int node,
- int cpu_going_down, cpumask_t *cpumask)
-{
- if (!wq_numa_enabled || attrs->no_numa)
- goto use_dfl;
+ lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
+ INIT_LIST_HEAD(&wq->list);
- /* does @node have any online CPUs @attrs wants? */
- cpumask_and(cpumask, cpumask_of_node(node), attrs->cpumask);
- if (cpu_going_down >= 0)
- cpumask_clear_cpu(cpu_going_down, cpumask);
+ if (alloc_and_link_pwqs(wq) < 0)
+ goto err_free_wq;
- if (cpumask_empty(cpumask))
- goto use_dfl;
+ /*
+ * Workqueues which may be used during memory reclaim should
+ * have a rescuer to guarantee forward progress.
+ */
+ if (flags & WQ_MEM_RECLAIM) {
+ struct worker *rescuer;
- /* yeap, return possible CPUs in @node that @attrs wants */
- cpumask_and(cpumask, attrs->cpumask, wq_numa_possible_cpumask[node]);
- return !cpumask_equal(cpumask, attrs->cpumask);
+ rescuer = alloc_worker(NUMA_NO_NODE);
+ if (!rescuer)
+ goto err_destroy;
-use_dfl:
- cpumask_copy(cpumask, attrs->cpumask);
- return false;
-}
+ rescuer->rescue_wq = wq;
+ rescuer->task = kthread_create(rescuer_thread, rescuer, "%s",
+ wq->name);
+ if (IS_ERR(rescuer->task)) {
+ kfree(rescuer);
+ goto err_destroy;
+ }
-/* install @pwq into @wq's numa_pwq_tbl[] for @node and return the old pwq */
-static struct pool_workqueue *numa_pwq_tbl_install(struct workqueue_struct *wq,
- int node,
- struct pool_workqueue *pwq)
-{
- struct pool_workqueue *old_pwq;
+ wq->rescuer = rescuer;
+ rescuer->task->flags |= PF_NO_SETAFFINITY;
+ wake_up_process(rescuer->task);
+ }
- lockdep_assert_held(&wq->mutex);
+ if ((wq->flags & WQ_SYSFS) && workqueue_sysfs_register(wq))
+ goto err_destroy;
- /* link_pwq() can handle duplicate calls */
- link_pwq(pwq);
+ /*
+ * wq_pool_mutex protects global freeze state and workqueues list.
+ * Grab it, adjust max_active and add the new @wq to workqueues
+ * list.
+ */
+ mutex_lock(&wq_pool_mutex);
- old_pwq = rcu_access_pointer(wq->numa_pwq_tbl[node]);
- rcu_assign_pointer(wq->numa_pwq_tbl[node], pwq);
- return old_pwq;
+ mutex_lock(&wq->mutex);
+ for_each_pwq(pwq, wq)
+ pwq_adjust_max_active(pwq);
+ mutex_unlock(&wq->mutex);
+
+ list_add_tail_rcu(&wq->list, &workqueues);
+
+ mutex_unlock(&wq_pool_mutex);
+
+ return wq;
+
+err_free_wq:
+ free_workqueue_attrs(wq->unbound_attrs);
+ kfree(wq);
+ return NULL;
+err_destroy:
+ destroy_workqueue(wq);
+ return NULL;
}
+EXPORT_SYMBOL_GPL(__alloc_workqueue_key);
/**
- * apply_workqueue_attrs - apply new workqueue_attrs to an unbound workqueue
- * @wq: the target workqueue
- * @attrs: the workqueue_attrs to apply, allocated with alloc_workqueue_attrs()
- *
- * Apply @attrs to an unbound workqueue @wq. Unless disabled, on NUMA
- * machines, this function maps a separate pwq to each NUMA node with
- * possibles CPUs in @attrs->cpumask so that work items are affine to the
- * NUMA node it was issued on. Older pwqs are released as in-flight work
- * items finish. Note that a work item which repeatedly requeues itself
- * back-to-back will stay on its current pwq.
- *
- * Performs GFP_KERNEL allocations.
+ * destroy_workqueue - safely terminate a workqueue
+ * @wq: target workqueue
*
- * Return: 0 on success and -errno on failure.
+ * Safely destroy a workqueue. All work currently pending will be done first.
*/
-int apply_workqueue_attrs(struct workqueue_struct *wq,
- const struct workqueue_attrs *attrs)
+void destroy_workqueue(struct workqueue_struct *wq)
{
- struct workqueue_attrs *new_attrs, *tmp_attrs;
- struct pool_workqueue **pwq_tbl, *dfl_pwq;
- int node, ret;
-
- /* only unbound workqueues can change attributes */
- if (WARN_ON(!(wq->flags & WQ_UNBOUND)))
- return -EINVAL;
+ struct pool_workqueue *pwq;
+ int node;
- /* creating multiple pwqs breaks ordering guarantee */
- if (WARN_ON((wq->flags & __WQ_ORDERED) && !list_empty(&wq->pwqs)))
- return -EINVAL;
+ /* drain it before proceeding with destruction */
+ drain_workqueue(wq);
- pwq_tbl = kzalloc(nr_node_ids * sizeof(pwq_tbl[0]), GFP_KERNEL);
- new_attrs = alloc_workqueue_attrs(GFP_KERNEL);
- tmp_attrs = alloc_workqueue_attrs(GFP_KERNEL);
- if (!pwq_tbl || !new_attrs || !tmp_attrs)
- goto enomem;
+ /* sanity checks */
+ mutex_lock(&wq->mutex);
+ for_each_pwq(pwq, wq) {
+ int i;
- /* make a copy of @attrs and sanitize it */
- copy_workqueue_attrs(new_attrs, attrs);
- cpumask_and(new_attrs->cpumask, new_attrs->cpumask, cpu_possible_mask);
+ for (i = 0; i < WORK_NR_COLORS; i++) {
+ if (WARN_ON(pwq->nr_in_flight[i])) {
+ mutex_unlock(&wq->mutex);
+ return;
+ }
+ }
- /*
- * We may create multiple pwqs with differing cpumasks. Make a
- * copy of @new_attrs which will be modified and used to obtain
- * pools.
- */
- copy_workqueue_attrs(tmp_attrs, new_attrs);
+ if (WARN_ON((pwq != wq->dfl_pwq) && (pwq->refcnt > 1)) ||
+ WARN_ON(pwq->nr_active) ||
+ WARN_ON(!list_empty(&pwq->delayed_works))) {
+ mutex_unlock(&wq->mutex);
+ return;
+ }
+ }
+ mutex_unlock(&wq->mutex);
/*
- * CPUs should stay stable across pwq creations and installations.
- * Pin CPUs, determine the target cpumask for each node and create
- * pwqs accordingly.
+ * wq list is used to freeze wq, remove from list after
+ * flushing is complete in case freeze races us.
*/
- get_online_cpus();
-
mutex_lock(&wq_pool_mutex);
+ list_del_rcu(&wq->list);
+ mutex_unlock(&wq_pool_mutex);
- /*
- * If something goes wrong during CPU up/down, we'll fall back to
- * the default pwq covering whole @attrs->cpumask. Always create
- * it even if we don't use it immediately.
- */
- dfl_pwq = alloc_unbound_pwq(wq, new_attrs);
- if (!dfl_pwq)
- goto enomem_pwq;
+ workqueue_sysfs_unregister(wq);
- for_each_node(node) {
- if (wq_calc_node_cpumask(attrs, node, -1, tmp_attrs->cpumask)) {
- pwq_tbl[node] = alloc_unbound_pwq(wq, tmp_attrs);
- if (!pwq_tbl[node])
- goto enomem_pwq;
- } else {
- dfl_pwq->refcnt++;
- pwq_tbl[node] = dfl_pwq;
+ if (wq->rescuer)
+ kthread_stop(wq->rescuer->task);
+
+ if (!(wq->flags & WQ_UNBOUND)) {
+ /*
+ * The base ref is never dropped on per-cpu pwqs. Directly
+ * schedule RCU free.
+ */
+ call_rcu_sched(&wq->rcu, rcu_free_wq);
+ } else {
+ /*
+ * We're the sole accessor of @wq at this point. Directly
+ * access numa_pwq_tbl[] and dfl_pwq to put the base refs.
+ * @wq will be freed when the last pwq is released.
+ */
+ for_each_node(node) {
+ pwq = rcu_access_pointer(wq->numa_pwq_tbl[node]);
+ RCU_INIT_POINTER(wq->numa_pwq_tbl[node], NULL);
+ put_pwq_unlocked(pwq);
}
+
+ /*
+ * Put dfl_pwq. @wq may be freed any time after dfl_pwq is
+ * put. Don't access it afterwards.
+ */
+ pwq = wq->dfl_pwq;
+ wq->dfl_pwq = NULL;
+ put_pwq_unlocked(pwq);
}
+}
+EXPORT_SYMBOL_GPL(destroy_workqueue);
- mutex_unlock(&wq_pool_mutex);
+/**
+ * workqueue_set_max_active - adjust max_active of a workqueue
+ * @wq: target workqueue
+ * @max_active: new max_active value.
+ *
+ * Set max_active of @wq to @max_active.
+ *
+ * CONTEXT:
+ * Don't call from IRQ context.
+ */
+void workqueue_set_max_active(struct workqueue_struct *wq, int max_active)
+{
+ struct pool_workqueue *pwq;
- /* all pwqs have been created successfully, let's install'em */
- mutex_lock(&wq->mutex);
+ /* disallow meddling with max_active for ordered workqueues */
+ if (WARN_ON(wq->flags & __WQ_ORDERED))
+ return;
- copy_workqueue_attrs(wq->unbound_attrs, new_attrs);
+ max_active = wq_clamp_max_active(max_active, wq->flags, wq->name);
- /* save the previous pwq and install the new one */
- for_each_node(node)
- pwq_tbl[node] = numa_pwq_tbl_install(wq, node, pwq_tbl[node]);
+ mutex_lock(&wq->mutex);
- /* @dfl_pwq might not have been used, ensure it's linked */
- link_pwq(dfl_pwq);
- swap(wq->dfl_pwq, dfl_pwq);
+ wq->saved_max_active = max_active;
- mutex_unlock(&wq->mutex);
+ for_each_pwq(pwq, wq)
+ pwq_adjust_max_active(pwq);
- /* put the old pwqs */
- for_each_node(node)
- put_pwq_unlocked(pwq_tbl[node]);
- put_pwq_unlocked(dfl_pwq);
+ mutex_unlock(&wq->mutex);
+}
+EXPORT_SYMBOL_GPL(workqueue_set_max_active);
- put_online_cpus();
- ret = 0;
- /* fall through */
-out_free:
- free_workqueue_attrs(tmp_attrs);
- free_workqueue_attrs(new_attrs);
- kfree(pwq_tbl);
- return ret;
+/**
+ * current_is_workqueue_rescuer - is %current workqueue rescuer?
+ *
+ * Determine whether %current is a workqueue rescuer. Can be used from
+ * work functions to determine whether it's being run off the rescuer task.
+ *
+ * Return: %true if %current is a workqueue rescuer. %false otherwise.
+ */
+bool current_is_workqueue_rescuer(void)
+{
+ struct worker *worker = current_wq_worker();
-enomem_pwq:
- free_unbound_pwq(dfl_pwq);
- for_each_node(node)
- if (pwq_tbl && pwq_tbl[node] != dfl_pwq)
- free_unbound_pwq(pwq_tbl[node]);
- mutex_unlock(&wq_pool_mutex);
- put_online_cpus();
-enomem:
- ret = -ENOMEM;
- goto out_free;
+ return worker && worker->rescue_wq;
}
/**
- * wq_update_unbound_numa - update NUMA affinity of a wq for CPU hot[un]plug
- * @wq: the target workqueue
- * @cpu: the CPU coming up or going down
- * @online: whether @cpu is coming up or going down
+ * workqueue_congested - test whether a workqueue is congested
+ * @cpu: CPU in question
+ * @wq: target workqueue
*
- * This function is to be called from %CPU_DOWN_PREPARE, %CPU_ONLINE and
- * %CPU_DOWN_FAILED. @cpu is being hot[un]plugged, update NUMA affinity of
- * @wq accordingly.
+ * Test whether @wq's cpu workqueue for @cpu is congested. There is
+ * no synchronization around this function and the test result is
+ * unreliable and only useful as advisory hints or for debugging.
*
- * If NUMA affinity can't be adjusted due to memory allocation failure, it
- * falls back to @wq->dfl_pwq which may not be optimal but is always
- * correct.
+ * If @cpu is WORK_CPU_UNBOUND, the test is performed on the local CPU.
+ * Note that both per-cpu and unbound workqueues may be associated with
+ * multiple pool_workqueues which have separate congested states. A
+ * workqueue being congested on one CPU doesn't mean the workqueue is also
+ * contested on other CPUs / NUMA nodes.
*
- * Note that when the last allowed CPU of a NUMA node goes offline for a
- * workqueue with a cpumask spanning multiple nodes, the workers which were
- * already executing the work items for the workqueue will lose their CPU
- * affinity and may execute on any CPU. This is similar to how per-cpu
- * workqueues behave on CPU_DOWN. If a workqueue user wants strict
- * affinity, it's the user's responsibility to flush the work item from
- * CPU_DOWN_PREPARE.
+ * Return:
+ * %true if congested, %false otherwise.
*/
-static void wq_update_unbound_numa(struct workqueue_struct *wq, int cpu,
- bool online)
+bool workqueue_congested(int cpu, struct workqueue_struct *wq)
{
- int node = cpu_to_node(cpu);
- int cpu_off = online ? -1 : cpu;
- struct pool_workqueue *old_pwq = NULL, *pwq;
- struct workqueue_attrs *target_attrs;
- cpumask_t *cpumask;
+ struct pool_workqueue *pwq;
+ bool ret;
- lockdep_assert_held(&wq_pool_mutex);
+ rcu_read_lock_sched();
- if (!wq_numa_enabled || !(wq->flags & WQ_UNBOUND))
- return;
+ if (cpu == WORK_CPU_UNBOUND)
+ cpu = smp_processor_id();
- /*
- * We don't wanna alloc/free wq_attrs for each wq for each CPU.
- * Let's use a preallocated one. The following buf is protected by
- * CPU hotplug exclusion.
- */
- target_attrs = wq_update_unbound_numa_attrs_buf;
- cpumask = target_attrs->cpumask;
+ if (!(wq->flags & WQ_UNBOUND))
+ pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
+ else
+ pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));
- mutex_lock(&wq->mutex);
- if (wq->unbound_attrs->no_numa)
- goto out_unlock;
+ ret = !list_empty(&pwq->delayed_works);
+ rcu_read_unlock_sched();
- copy_workqueue_attrs(target_attrs, wq->unbound_attrs);
- pwq = unbound_pwq_by_node(wq, node);
+ return ret;
+}
+EXPORT_SYMBOL_GPL(workqueue_congested);
- /*
- * Let's determine what needs to be done. If the target cpumask is
- * different from wq's, we need to compare it to @pwq's and create
- * a new one if they don't match. If the target cpumask equals
- * wq's, the default pwq should be used.
- */
- if (wq_calc_node_cpumask(wq->unbound_attrs, node, cpu_off, cpumask)) {
- if (cpumask_equal(cpumask, pwq->pool->attrs->cpumask))
- goto out_unlock;
- } else {
- goto use_dfl_pwq;
- }
+/**
+ * work_busy - test whether a work is currently pending or running
+ * @work: the work to be tested
+ *
+ * Test whether @work is currently pending or running. There is no
+ * synchronization around this function and the test result is
+ * unreliable and only useful as advisory hints or for debugging.
+ *
+ * Return:
+ * OR'd bitmask of WORK_BUSY_* bits.
+ */
+unsigned int work_busy(struct work_struct *work)
+{
+ struct worker_pool *pool;
+ unsigned long flags;
+ unsigned int ret = 0;
- mutex_unlock(&wq->mutex);
+ if (work_pending(work))
+ ret |= WORK_BUSY_PENDING;
- /* create a new pwq */
- pwq = alloc_unbound_pwq(wq, target_attrs);
- if (!pwq) {
- pr_warn("workqueue: allocation failed while updating NUMA affinity of \"%s\"\n",
- wq->name);
- mutex_lock(&wq->mutex);
- goto use_dfl_pwq;
+ local_irq_save(flags);
+ pool = get_work_pool(work);
+ if (pool) {
+ spin_lock(&pool->lock);
+ if (find_worker_executing_work(pool, work))
+ ret |= WORK_BUSY_RUNNING;
+ spin_unlock(&pool->lock);
}
+ local_irq_restore(flags);
- /*
- * Install the new pwq. As this function is called only from CPU
- * hotplug callbacks and applying a new attrs is wrapped with
- * get/put_online_cpus(), @wq->unbound_attrs couldn't have changed
- * inbetween.
- */
- mutex_lock(&wq->mutex);
- old_pwq = numa_pwq_tbl_install(wq, node, pwq);
- goto out_unlock;
+ return ret;
+}
+EXPORT_SYMBOL_GPL(work_busy);
-use_dfl_pwq:
- spin_lock_irq(&wq->dfl_pwq->pool->lock);
- get_pwq(wq->dfl_pwq);
- spin_unlock_irq(&wq->dfl_pwq->pool->lock);
- old_pwq = numa_pwq_tbl_install(wq, node, wq->dfl_pwq);
-out_unlock:
- mutex_unlock(&wq->mutex);
- put_pwq_unlocked(old_pwq);
+/**
+ * set_worker_desc - set description for the current work item
+ * @fmt: printf-style format string
+ * @...: arguments for the format string
+ *
+ * This function can be called by a running work function to describe what
+ * the work item is about. If the worker task gets dumped, this
+ * information will be printed out together to help debugging. The
+ * description can be at most WORKER_DESC_LEN including the trailing '\0'.
+ */
+void set_worker_desc(const char *fmt, ...)
+{
+ struct worker *worker = current_wq_worker();
+ va_list args;
+
+ if (worker) {
+ va_start(args, fmt);
+ vsnprintf(worker->desc, sizeof(worker->desc), fmt, args);
+ va_end(args);
+ worker->desc_valid = true;
+ }
}
-static int alloc_and_link_pwqs(struct workqueue_struct *wq)
+/**
+ * print_worker_info - print out worker information and description
+ * @log_lvl: the log level to use when printing
+ * @task: target task
+ *
+ * If @task is a worker and currently executing a work item, print out the
+ * name of the workqueue being serviced and worker description set with
+ * set_worker_desc() by the currently executing work item.
+ *
+ * This function can be safely called on any task as long as the
+ * task_struct itself is accessible. While safe, this function isn't
+ * synchronized and may print out mixups or garbages of limited length.
+ */
+void print_worker_info(const char *log_lvl, struct task_struct *task)
{
- bool highpri = wq->flags & WQ_HIGHPRI;
- int cpu, ret;
+ work_func_t *fn = NULL;
+ char name[WQ_NAME_LEN] = { };
+ char desc[WORKER_DESC_LEN] = { };
+ struct pool_workqueue *pwq = NULL;
+ struct workqueue_struct *wq = NULL;
+ bool desc_valid = false;
+ struct worker *worker;
- if (!(wq->flags & WQ_UNBOUND)) {
- wq->cpu_pwqs = alloc_percpu(struct pool_workqueue);
- if (!wq->cpu_pwqs)
- return -ENOMEM;
+ if (!(task->flags & PF_WQ_WORKER))
+ return;
- for_each_possible_cpu(cpu) {
- struct pool_workqueue *pwq =
- per_cpu_ptr(wq->cpu_pwqs, cpu);
- struct worker_pool *cpu_pools =
- per_cpu(cpu_worker_pools, cpu);
+ /*
+ * This function is called without any synchronization and @task
+ * could be in any state. Be careful with dereferences.
+ */
+ worker = probe_kthread_data(task);
- init_pwq(pwq, wq, &cpu_pools[highpri]);
+ /*
+ * Carefully copy the associated workqueue's workfn and name. Keep
+ * the original last '\0' in case the original contains garbage.
+ */
+ probe_kernel_read(&fn, &worker->current_func, sizeof(fn));
+ probe_kernel_read(&pwq, &worker->current_pwq, sizeof(pwq));
+ probe_kernel_read(&wq, &pwq->wq, sizeof(wq));
+ probe_kernel_read(name, wq->name, sizeof(name) - 1);
- mutex_lock(&wq->mutex);
- link_pwq(pwq);
- mutex_unlock(&wq->mutex);
- }
- return 0;
- } else if (wq->flags & __WQ_ORDERED) {
- ret = apply_workqueue_attrs(wq, ordered_wq_attrs[highpri]);
- /* there should only be single pwq for ordering guarantee */
- WARN(!ret && (wq->pwqs.next != &wq->dfl_pwq->pwqs_node ||
- wq->pwqs.prev != &wq->dfl_pwq->pwqs_node),
- "ordering guarantee broken for workqueue %s\n", wq->name);
- return ret;
- } else {
- return apply_workqueue_attrs(wq, unbound_std_wq_attrs[highpri]);
+ /* copy worker description */
+ probe_kernel_read(&desc_valid, &worker->desc_valid, sizeof(desc_valid));
+ if (desc_valid)
+ probe_kernel_read(desc, worker->desc, sizeof(desc) - 1);
+
+ if (fn || name[0] || desc[0]) {
+ printk("%sWorkqueue: %s %pf", log_lvl, name, fn);
+ if (desc[0])
+ pr_cont(" (%s)", desc);
+ pr_cont("\n");
}
}
-static int wq_clamp_max_active(int max_active, unsigned int flags,
- const char *name)
+static void pr_cont_pool_info(struct worker_pool *pool)
{
- int lim = flags & WQ_UNBOUND ? WQ_UNBOUND_MAX_ACTIVE : WQ_MAX_ACTIVE;
+ pr_cont(" cpus=%*pbl", nr_cpumask_bits, pool->attrs->cpumask);
+ if (pool->node != NUMA_NO_NODE)
+ pr_cont(" node=%d", pool->node);
+ pr_cont(" flags=0x%x nice=%d", pool->flags, pool->attrs->nice);
+}
- if (max_active < 1 || max_active > lim)
- pr_warn("workqueue: max_active %d requested for %s is out of range, clamping between %d and %d\n",
- max_active, name, 1, lim);
+static void pr_cont_work(bool comma, struct work_struct *work)
+{
+ if (work->func == wq_barrier_func) {
+ struct wq_barrier *barr;
- return clamp_val(max_active, 1, lim);
+ barr = container_of(work, struct wq_barrier, work);
+
+ pr_cont("%s BAR(%d)", comma ? "," : "",
+ task_pid_nr(barr->task));
+ } else {
+ pr_cont("%s %pf", comma ? "," : "", work->func);
+ }
}
-struct workqueue_struct *__alloc_workqueue_key(const char *fmt,
- unsigned int flags,
- int max_active,
- struct lock_class_key *key,
- const char *lock_name, ...)
+static void show_pwq(struct pool_workqueue *pwq)
{
- size_t tbl_size = 0;
- va_list args;
- struct workqueue_struct *wq;
- struct pool_workqueue *pwq;
-
- /* see the comment above the definition of WQ_POWER_EFFICIENT */
- if ((flags & WQ_POWER_EFFICIENT) && wq_power_efficient)
- flags |= WQ_UNBOUND;
+ struct worker_pool *pool = pwq->pool;
+ struct work_struct *work;
+ struct worker *worker;
+ bool has_in_flight = false, has_pending = false;
+ int bkt;
- /* allocate wq and format name */
- if (flags & WQ_UNBOUND)
- tbl_size = nr_node_ids * sizeof(wq->numa_pwq_tbl[0]);
+ pr_info(" pwq %d:", pool->id);
+ pr_cont_pool_info(pool);
- wq = kzalloc(sizeof(*wq) + tbl_size, GFP_KERNEL);
- if (!wq)
- return NULL;
+ pr_cont(" active=%d/%d%s\n", pwq->nr_active, pwq->max_active,
+ !list_empty(&pwq->mayday_node) ? " MAYDAY" : "");
- if (flags & WQ_UNBOUND) {
- wq->unbound_attrs = alloc_workqueue_attrs(GFP_KERNEL);
- if (!wq->unbound_attrs)
- goto err_free_wq;
+ hash_for_each(pool->busy_hash, bkt, worker, hentry) {
+ if (worker->current_pwq == pwq) {
+ has_in_flight = true;
+ break;
+ }
}
+ if (has_in_flight) {
+ bool comma = false;
- va_start(args, lock_name);
- vsnprintf(wq->name, sizeof(wq->name), fmt, args);
- va_end(args);
-
- max_active = max_active ?: WQ_DFL_ACTIVE;
- max_active = wq_clamp_max_active(max_active, flags, wq->name);
+ pr_info(" in-flight:");
+ hash_for_each(pool->busy_hash, bkt, worker, hentry) {
+ if (worker->current_pwq != pwq)
+ continue;
- /* init wq */
- wq->flags = flags;
- wq->saved_max_active = max_active;
- mutex_init(&wq->mutex);
- atomic_set(&wq->nr_pwqs_to_flush, 0);
- INIT_LIST_HEAD(&wq->pwqs);
- INIT_LIST_HEAD(&wq->flusher_queue);
- INIT_LIST_HEAD(&wq->flusher_overflow);
- INIT_LIST_HEAD(&wq->maydays);
+ pr_cont("%s %d%s:%pf", comma ? "," : "",
+ task_pid_nr(worker->task),
+ worker == pwq->wq->rescuer ? "(RESCUER)" : "",
+ worker->current_func);
+ list_for_each_entry(work, &worker->scheduled, entry)
+ pr_cont_work(false, work);
+ comma = true;
+ }
+ pr_cont("\n");
+ }
- lockdep_init_map(&wq->lockdep_map, lock_name, key, 0);
- INIT_LIST_HEAD(&wq->list);
+ list_for_each_entry(work, &pool->worklist, entry) {
+ if (get_work_pwq(work) == pwq) {
+ has_pending = true;
+ break;
+ }
+ }
+ if (has_pending) {
+ bool comma = false;
- if (alloc_and_link_pwqs(wq) < 0)
- goto err_free_wq;
+ pr_info(" pending:");
+ list_for_each_entry(work, &pool->worklist, entry) {
+ if (get_work_pwq(work) != pwq)
+ continue;
- /*
- * Workqueues which may be used during memory reclaim should
- * have a rescuer to guarantee forward progress.
- */
- if (flags & WQ_MEM_RECLAIM) {
- struct worker *rescuer;
+ pr_cont_work(comma, work);
+ comma = !(*work_data_bits(work) & WORK_STRUCT_LINKED);
+ }
+ pr_cont("\n");
+ }
- rescuer = alloc_worker(NUMA_NO_NODE);
- if (!rescuer)
- goto err_destroy;
+ if (!list_empty(&pwq->delayed_works)) {
+ bool comma = false;
- rescuer->rescue_wq = wq;
- rescuer->task = kthread_create(rescuer_thread, rescuer, "%s",
- wq->name);
- if (IS_ERR(rescuer->task)) {
- kfree(rescuer);
- goto err_destroy;
+ pr_info(" delayed:");
+ list_for_each_entry(work, &pwq->delayed_works, entry) {
+ pr_cont_work(comma, work);
+ comma = !(*work_data_bits(work) & WORK_STRUCT_LINKED);
}
-
- wq->rescuer = rescuer;
- rescuer->task->flags |= PF_NO_SETAFFINITY;
- wake_up_process(rescuer->task);
+ pr_cont("\n");
}
+}
- if ((wq->flags & WQ_SYSFS) && workqueue_sysfs_register(wq))
- goto err_destroy;
+/**
+ * show_workqueue_state - dump workqueue state
+ *
+ * Called from a sysrq handler and prints out all busy workqueues and
+ * pools.
+ */
+void show_workqueue_state(void)
+{
+ struct workqueue_struct *wq;
+ struct worker_pool *pool;
+ unsigned long flags;
+ int pi;
- /*
- * wq_pool_mutex protects global freeze state and workqueues list.
- * Grab it, adjust max_active and add the new @wq to workqueues
- * list.
- */
- mutex_lock(&wq_pool_mutex);
+ rcu_read_lock_sched();
- mutex_lock(&wq->mutex);
- for_each_pwq(pwq, wq)
- pwq_adjust_max_active(pwq);
- mutex_unlock(&wq->mutex);
+ pr_info("Showing busy workqueues and worker pools:\n");
- list_add(&wq->list, &workqueues);
+ list_for_each_entry_rcu(wq, &workqueues, list) {
+ struct pool_workqueue *pwq;
+ bool idle = true;
- mutex_unlock(&wq_pool_mutex);
+ for_each_pwq(pwq, wq) {
+ if (pwq->nr_active || !list_empty(&pwq->delayed_works)) {
+ idle = false;
+ break;
+ }
+ }
+ if (idle)
+ continue;
- return wq;
+ pr_info("workqueue %s: flags=0x%x\n", wq->name, wq->flags);
-err_free_wq:
- free_workqueue_attrs(wq->unbound_attrs);
- kfree(wq);
- return NULL;
-err_destroy:
- destroy_workqueue(wq);
- return NULL;
+ for_each_pwq(pwq, wq) {
+ spin_lock_irqsave(&pwq->pool->lock, flags);
+ if (pwq->nr_active || !list_empty(&pwq->delayed_works))
+ show_pwq(pwq);
+ spin_unlock_irqrestore(&pwq->pool->lock, flags);
+ }
+ }
+
+ for_each_pool(pool, pi) {
+ struct worker *worker;
+ bool first = true;
+
+ spin_lock_irqsave(&pool->lock, flags);
+ if (pool->nr_workers == pool->nr_idle)
+ goto next_pool;
+
+ pr_info("pool %d:", pool->id);
+ pr_cont_pool_info(pool);
+ pr_cont(" workers=%d", pool->nr_workers);
+ if (pool->manager)
+ pr_cont(" manager: %d",
+ task_pid_nr(pool->manager->task));
+ list_for_each_entry(worker, &pool->idle_list, entry) {
+ pr_cont(" %s%d", first ? "idle: " : "",
+ task_pid_nr(worker->task));
+ first = false;
+ }
+ pr_cont("\n");
+ next_pool:
+ spin_unlock_irqrestore(&pool->lock, flags);
+ }
+
+ rcu_read_unlock_sched();
}
-EXPORT_SYMBOL_GPL(__alloc_workqueue_key);
-/**
- * destroy_workqueue - safely terminate a workqueue
- * @wq: target workqueue
+/*
+ * CPU hotplug.
*
- * Safely destroy a workqueue. All work currently pending will be done first.
+ * There are two challenges in supporting CPU hotplug. Firstly, there
+ * are a lot of assumptions on strong associations among work, pwq and
+ * pool which make migrating pending and scheduled works very
+ * difficult to implement without impacting hot paths. Secondly,
+ * worker pools serve mix of short, long and very long running works making
+ * blocked draining impractical.
+ *
+ * This is solved by allowing the pools to be disassociated from the CPU
+ * running as an unbound one and allowing it to be reattached later if the
+ * cpu comes back online.
*/
-void destroy_workqueue(struct workqueue_struct *wq)
-{
- struct pool_workqueue *pwq;
- int node;
-
- /* drain it before proceeding with destruction */
- drain_workqueue(wq);
-
- /* sanity checks */
- mutex_lock(&wq->mutex);
- for_each_pwq(pwq, wq) {
- int i;
- for (i = 0; i < WORK_NR_COLORS; i++) {
- if (WARN_ON(pwq->nr_in_flight[i])) {
- mutex_unlock(&wq->mutex);
- return;
- }
- }
+static void wq_unbind_fn(struct work_struct *work)
+{
+ int cpu = smp_processor_id();
+ struct worker_pool *pool;
+ struct worker *worker;
- if (WARN_ON((pwq != wq->dfl_pwq) && (pwq->refcnt > 1)) ||
- WARN_ON(pwq->nr_active) ||
- WARN_ON(!list_empty(&pwq->delayed_works))) {
- mutex_unlock(&wq->mutex);
- return;
- }
- }
- mutex_unlock(&wq->mutex);
+ for_each_cpu_worker_pool(pool, cpu) {
+ mutex_lock(&pool->attach_mutex);
+ spin_lock_irq(&pool->lock);
- /*
- * wq list is used to freeze wq, remove from list after
- * flushing is complete in case freeze races us.
- */
- mutex_lock(&wq_pool_mutex);
- list_del_init(&wq->list);
- mutex_unlock(&wq_pool_mutex);
+ /*
+ * We've blocked all attach/detach operations. Make all workers
+ * unbound and set DISASSOCIATED. Before this, all workers
+ * except for the ones which are still executing works from
+ * before the last CPU down must be on the cpu. After
+ * this, they may become diasporas.
+ */
+ for_each_pool_worker(worker, pool)
+ worker->flags |= WORKER_UNBOUND;
- workqueue_sysfs_unregister(wq);
+ pool->flags |= POOL_DISASSOCIATED;
- if (wq->rescuer) {
- kthread_stop(wq->rescuer->task);
- kfree(wq->rescuer);
- wq->rescuer = NULL;
- }
+ spin_unlock_irq(&pool->lock);
+ mutex_unlock(&pool->attach_mutex);
- if (!(wq->flags & WQ_UNBOUND)) {
/*
- * The base ref is never dropped on per-cpu pwqs. Directly
- * free the pwqs and wq.
+ * Call schedule() so that we cross rq->lock and thus can
+ * guarantee sched callbacks see the %WORKER_UNBOUND flag.
+ * This is necessary as scheduler callbacks may be invoked
+ * from other cpus.
*/
- free_percpu(wq->cpu_pwqs);
- kfree(wq);
- } else {
+ schedule();
+
/*
- * We're the sole accessor of @wq at this point. Directly
- * access numa_pwq_tbl[] and dfl_pwq to put the base refs.
- * @wq will be freed when the last pwq is released.
+ * Sched callbacks are disabled now. Zap nr_running.
+ * After this, nr_running stays zero and need_more_worker()
+ * and keep_working() are always true as long as the
+ * worklist is not empty. This pool now behaves as an
+ * unbound (in terms of concurrency management) pool which
+ * are served by workers tied to the pool.
*/
- for_each_node(node) {
- pwq = rcu_access_pointer(wq->numa_pwq_tbl[node]);
- RCU_INIT_POINTER(wq->numa_pwq_tbl[node], NULL);
- put_pwq_unlocked(pwq);
- }
+ atomic_set(&pool->nr_running, 0);
/*
- * Put dfl_pwq. @wq may be freed any time after dfl_pwq is
- * put. Don't access it afterwards.
+ * With concurrency management just turned off, a busy
+ * worker blocking could lead to lengthy stalls. Kick off
+ * unbound chain execution of currently pending work items.
*/
- pwq = wq->dfl_pwq;
- wq->dfl_pwq = NULL;
- put_pwq_unlocked(pwq);
+ spin_lock_irq(&pool->lock);
+ wake_up_worker(pool);
+ spin_unlock_irq(&pool->lock);
}
}
-EXPORT_SYMBOL_GPL(destroy_workqueue);
/**
- * workqueue_set_max_active - adjust max_active of a workqueue
- * @wq: target workqueue
- * @max_active: new max_active value.
- *
- * Set max_active of @wq to @max_active.
+ * rebind_workers - rebind all workers of a pool to the associated CPU
+ * @pool: pool of interest
*
- * CONTEXT:
- * Don't call from IRQ context.
+ * @pool->cpu is coming online. Rebind all workers to the CPU.
*/
-void workqueue_set_max_active(struct workqueue_struct *wq, int max_active)
+static void rebind_workers(struct worker_pool *pool)
{
- struct pool_workqueue *pwq;
+ struct worker *worker;
- /* disallow meddling with max_active for ordered workqueues */
- if (WARN_ON(wq->flags & __WQ_ORDERED))
- return;
+ lockdep_assert_held(&pool->attach_mutex);
- max_active = wq_clamp_max_active(max_active, wq->flags, wq->name);
+ /*
+ * Restore CPU affinity of all workers. As all idle workers should
+ * be on the run-queue of the associated CPU before any local
+ * wake-ups for concurrency management happen, restore CPU affinty
+ * of all workers first and then clear UNBOUND. As we're called
+ * from CPU_ONLINE, the following shouldn't fail.
+ */
+ for_each_pool_worker(worker, pool)
+ WARN_ON_ONCE(set_cpus_allowed_ptr(worker->task,
+ pool->attrs->cpumask) < 0);
- mutex_lock(&wq->mutex);
+ spin_lock_irq(&pool->lock);
+ pool->flags &= ~POOL_DISASSOCIATED;
- wq->saved_max_active = max_active;
+ for_each_pool_worker(worker, pool) {
+ unsigned int worker_flags = worker->flags;
- for_each_pwq(pwq, wq)
- pwq_adjust_max_active(pwq);
+ /*
+ * A bound idle worker should actually be on the runqueue
+ * of the associated CPU for local wake-ups targeting it to
+ * work. Kick all idle workers so that they migrate to the
+ * associated CPU. Doing this in the same loop as
+ * replacing UNBOUND with REBOUND is safe as no worker will
+ * be bound before @pool->lock is released.
+ */
+ if (worker_flags & WORKER_IDLE)
+ wake_up_process(worker->task);
- mutex_unlock(&wq->mutex);
+ /*
+ * We want to clear UNBOUND but can't directly call
+ * worker_clr_flags() or adjust nr_running. Atomically
+ * replace UNBOUND with another NOT_RUNNING flag REBOUND.
+ * @worker will clear REBOUND using worker_clr_flags() when
+ * it initiates the next execution cycle thus restoring
+ * concurrency management. Note that when or whether
+ * @worker clears REBOUND doesn't affect correctness.
+ *
+ * ACCESS_ONCE() is necessary because @worker->flags may be
+ * tested without holding any lock in
+ * wq_worker_waking_up(). Without it, NOT_RUNNING test may
+ * fail incorrectly leading to premature concurrency
+ * management operations.
+ */
+ WARN_ON_ONCE(!(worker_flags & WORKER_UNBOUND));
+ worker_flags |= WORKER_REBOUND;
+ worker_flags &= ~WORKER_UNBOUND;
+ ACCESS_ONCE(worker->flags) = worker_flags;
+ }
+
+ spin_unlock_irq(&pool->lock);
}
-EXPORT_SYMBOL_GPL(workqueue_set_max_active);
/**
- * current_is_workqueue_rescuer - is %current workqueue rescuer?
- *
- * Determine whether %current is a workqueue rescuer. Can be used from
- * work functions to determine whether it's being run off the rescuer task.
+ * restore_unbound_workers_cpumask - restore cpumask of unbound workers
+ * @pool: unbound pool of interest
+ * @cpu: the CPU which is coming up
*
- * Return: %true if %current is a workqueue rescuer. %false otherwise.
+ * An unbound pool may end up with a cpumask which doesn't have any online
+ * CPUs. When a worker of such pool get scheduled, the scheduler resets
+ * its cpus_allowed. If @cpu is in @pool's cpumask which didn't have any
+ * online CPU before, cpus_allowed of all its workers should be restored.
*/
-bool current_is_workqueue_rescuer(void)
+static void restore_unbound_workers_cpumask(struct worker_pool *pool, int cpu)
{
- struct worker *worker = current_wq_worker();
+ static cpumask_t cpumask;
+ struct worker *worker;
- return worker && worker->rescue_wq;
+ lockdep_assert_held(&pool->attach_mutex);
+
+ /* is @cpu allowed for @pool? */
+ if (!cpumask_test_cpu(cpu, pool->attrs->cpumask))
+ return;
+
+ /* is @cpu the only online CPU? */
+ cpumask_and(&cpumask, pool->attrs->cpumask, cpu_online_mask);
+ if (cpumask_weight(&cpumask) != 1)
+ return;
+
+ /* as we're called from CPU_ONLINE, the following shouldn't fail */
+ for_each_pool_worker(worker, pool)
+ WARN_ON_ONCE(set_cpus_allowed_ptr(worker->task,
+ pool->attrs->cpumask) < 0);
}
-/**
- * workqueue_congested - test whether a workqueue is congested
- * @cpu: CPU in question
- * @wq: target workqueue
- *
- * Test whether @wq's cpu workqueue for @cpu is congested. There is
- * no synchronization around this function and the test result is
- * unreliable and only useful as advisory hints or for debugging.
- *
- * If @cpu is WORK_CPU_UNBOUND, the test is performed on the local CPU.
- * Note that both per-cpu and unbound workqueues may be associated with
- * multiple pool_workqueues which have separate congested states. A
- * workqueue being congested on one CPU doesn't mean the workqueue is also
- * contested on other CPUs / NUMA nodes.
- *
- * Return:
- * %true if congested, %false otherwise.
+/*
+ * Workqueues should be brought up before normal priority CPU notifiers.
+ * This will be registered high priority CPU notifier.
*/
-bool workqueue_congested(int cpu, struct workqueue_struct *wq)
+static int workqueue_cpu_up_callback(struct notifier_block *nfb,
+ unsigned long action,
+ void *hcpu)
{
- struct pool_workqueue *pwq;
- bool ret;
+ int cpu = (unsigned long)hcpu;
+ struct worker_pool *pool;
+ struct workqueue_struct *wq;
+ int pi;
- rcu_read_lock_sched();
+ switch (action & ~CPU_TASKS_FROZEN) {
+ case CPU_UP_PREPARE:
+ for_each_cpu_worker_pool(pool, cpu) {
+ if (pool->nr_workers)
+ continue;
+ if (!create_worker(pool))
+ return NOTIFY_BAD;
+ }
+ break;
- if (cpu == WORK_CPU_UNBOUND)
- cpu = smp_processor_id();
+ case CPU_DOWN_FAILED:
+ case CPU_ONLINE:
+ mutex_lock(&wq_pool_mutex);
- if (!(wq->flags & WQ_UNBOUND))
- pwq = per_cpu_ptr(wq->cpu_pwqs, cpu);
- else
- pwq = unbound_pwq_by_node(wq, cpu_to_node(cpu));
+ for_each_pool(pool, pi) {
+ mutex_lock(&pool->attach_mutex);
- ret = !list_empty(&pwq->delayed_works);
- rcu_read_unlock_sched();
+ if (pool->cpu == cpu)
+ rebind_workers(pool);
+ else if (pool->cpu < 0)
+ restore_unbound_workers_cpumask(pool, cpu);
- return ret;
+ mutex_unlock(&pool->attach_mutex);
+ }
+
+ /* update NUMA affinity of unbound workqueues */
+ list_for_each_entry(wq, &workqueues, list)
+ wq_update_unbound_numa(wq, cpu, true);
+
+ mutex_unlock(&wq_pool_mutex);
+ break;
+ }
+ return NOTIFY_OK;
}
-EXPORT_SYMBOL_GPL(workqueue_congested);
-/**
- * work_busy - test whether a work is currently pending or running
- * @work: the work to be tested
- *
- * Test whether @work is currently pending or running. There is no
- * synchronization around this function and the test result is
- * unreliable and only useful as advisory hints or for debugging.
- *
- * Return:
- * OR'd bitmask of WORK_BUSY_* bits.
+/*
+ * Workqueues should be brought down after normal priority CPU notifiers.
+ * This will be registered as low priority CPU notifier.
*/
-unsigned int work_busy(struct work_struct *work)
+static int workqueue_cpu_down_callback(struct notifier_block *nfb,
+ unsigned long action,
+ void *hcpu)
{
- struct worker_pool *pool;
- unsigned long flags;
- unsigned int ret = 0;
+ int cpu = (unsigned long)hcpu;
+ struct work_struct unbind_work;
+ struct workqueue_struct *wq;
- if (work_pending(work))
- ret |= WORK_BUSY_PENDING;
+ switch (action & ~CPU_TASKS_FROZEN) {
+ case CPU_DOWN_PREPARE:
+ /* unbinding per-cpu workers should happen on the local CPU */
+ INIT_WORK_ONSTACK(&unbind_work, wq_unbind_fn);
+ queue_work_on(cpu, system_highpri_wq, &unbind_work);
- local_irq_save(flags);
- pool = get_work_pool(work);
- if (pool) {
- spin_lock(&pool->lock);
- if (find_worker_executing_work(pool, work))
- ret |= WORK_BUSY_RUNNING;
- spin_unlock(&pool->lock);
+ /* update NUMA affinity of unbound workqueues */
+ mutex_lock(&wq_pool_mutex);
+ list_for_each_entry(wq, &workqueues, list)
+ wq_update_unbound_numa(wq, cpu, false);
+ mutex_unlock(&wq_pool_mutex);
+
+ /* wait for per-cpu unbinding to finish */
+ flush_work(&unbind_work);
+ destroy_work_on_stack(&unbind_work);
+ break;
}
- local_irq_restore(flags);
+ return NOTIFY_OK;
+}
- return ret;
+#ifdef CONFIG_SMP
+
+struct work_for_cpu {
+ struct work_struct work;
+ long (*fn)(void *);
+ void *arg;
+ long ret;
+};
+
+static void work_for_cpu_fn(struct work_struct *work)
+{
+ struct work_for_cpu *wfc = container_of(work, struct work_for_cpu, work);
+
+ wfc->ret = wfc->fn(wfc->arg);
}
-EXPORT_SYMBOL_GPL(work_busy);
/**
- * set_worker_desc - set description for the current work item
- * @fmt: printf-style format string
- * @...: arguments for the format string
+ * work_on_cpu - run a function in user context on a particular cpu
+ * @cpu: the cpu to run on
+ * @fn: the function to run
+ * @arg: the function arg
*
- * This function can be called by a running work function to describe what
- * the work item is about. If the worker task gets dumped, this
- * information will be printed out together to help debugging. The
- * description can be at most WORKER_DESC_LEN including the trailing '\0'.
+ * It is up to the caller to ensure that the cpu doesn't go offline.
+ * The caller must not hold any locks which would prevent @fn from completing.
+ *
+ * Return: The value @fn returns.
*/
-void set_worker_desc(const char *fmt, ...)
+long work_on_cpu(int cpu, long (*fn)(void *), void *arg)
{
- struct worker *worker = current_wq_worker();
- va_list args;
+ struct work_for_cpu wfc = { .fn = fn, .arg = arg };
- if (worker) {
- va_start(args, fmt);
- vsnprintf(worker->desc, sizeof(worker->desc), fmt, args);
- va_end(args);
- worker->desc_valid = true;
- }
+ INIT_WORK_ONSTACK(&wfc.work, work_for_cpu_fn);
+ schedule_work_on(cpu, &wfc.work);
+ flush_work(&wfc.work);
+ destroy_work_on_stack(&wfc.work);
+ return wfc.ret;
}
+EXPORT_SYMBOL_GPL(work_on_cpu);
+#endif /* CONFIG_SMP */
+
+#ifdef CONFIG_FREEZER
/**
- * print_worker_info - print out worker information and description
- * @log_lvl: the log level to use when printing
- * @task: target task
+ * freeze_workqueues_begin - begin freezing workqueues
*
- * If @task is a worker and currently executing a work item, print out the
- * name of the workqueue being serviced and worker description set with
- * set_worker_desc() by the currently executing work item.
+ * Start freezing workqueues. After this function returns, all freezable
+ * workqueues will queue new works to their delayed_works list instead of
+ * pool->worklist.
*
- * This function can be safely called on any task as long as the
- * task_struct itself is accessible. While safe, this function isn't
- * synchronized and may print out mixups or garbages of limited length.
+ * CONTEXT:
+ * Grabs and releases wq_pool_mutex, wq->mutex and pool->lock's.
*/
-void print_worker_info(const char *log_lvl, struct task_struct *task)
+void freeze_workqueues_begin(void)
{
- work_func_t *fn = NULL;
- char name[WQ_NAME_LEN] = { };
- char desc[WORKER_DESC_LEN] = { };
- struct pool_workqueue *pwq = NULL;
- struct workqueue_struct *wq = NULL;
- bool desc_valid = false;
- struct worker *worker;
-
- if (!(task->flags & PF_WQ_WORKER))
- return;
-
- /*
- * This function is called without any synchronization and @task
- * could be in any state. Be careful with dereferences.
- */
- worker = probe_kthread_data(task);
+ struct workqueue_struct *wq;
+ struct pool_workqueue *pwq;
- /*
- * Carefully copy the associated workqueue's workfn and name. Keep
- * the original last '\0' in case the original contains garbage.
- */
- probe_kernel_read(&fn, &worker->current_func, sizeof(fn));
- probe_kernel_read(&pwq, &worker->current_pwq, sizeof(pwq));
- probe_kernel_read(&wq, &pwq->wq, sizeof(wq));
- probe_kernel_read(name, wq->name, sizeof(name) - 1);
+ mutex_lock(&wq_pool_mutex);
- /* copy worker description */
- probe_kernel_read(&desc_valid, &worker->desc_valid, sizeof(desc_valid));
- if (desc_valid)
- probe_kernel_read(desc, worker->desc, sizeof(desc) - 1);
+ WARN_ON_ONCE(workqueue_freezing);
+ workqueue_freezing = true;
- if (fn || name[0] || desc[0]) {
- printk("%sWorkqueue: %s %pf", log_lvl, name, fn);
- if (desc[0])
- pr_cont(" (%s)", desc);
- pr_cont("\n");
+ list_for_each_entry(wq, &workqueues, list) {
+ mutex_lock(&wq->mutex);
+ for_each_pwq(pwq, wq)
+ pwq_adjust_max_active(pwq);
+ mutex_unlock(&wq->mutex);
}
+
+ mutex_unlock(&wq_pool_mutex);
}
-/*
- * CPU hotplug.
+/**
+ * freeze_workqueues_busy - are freezable workqueues still busy?
*
- * There are two challenges in supporting CPU hotplug. Firstly, there
- * are a lot of assumptions on strong associations among work, pwq and
- * pool which make migrating pending and scheduled works very
- * difficult to implement without impacting hot paths. Secondly,
- * worker pools serve mix of short, long and very long running works making
- * blocked draining impractical.
+ * Check whether freezing is complete. This function must be called
+ * between freeze_workqueues_begin() and thaw_workqueues().
*
- * This is solved by allowing the pools to be disassociated from the CPU
- * running as an unbound one and allowing it to be reattached later if the
- * cpu comes back online.
+ * CONTEXT:
+ * Grabs and releases wq_pool_mutex.
+ *
+ * Return:
+ * %true if some freezable workqueues are still busy. %false if freezing
+ * is complete.
*/
-
-static void wq_unbind_fn(struct work_struct *work)
+bool freeze_workqueues_busy(void)
{
- int cpu = smp_processor_id();
- struct worker_pool *pool;
- struct worker *worker;
+ bool busy = false;
+ struct workqueue_struct *wq;
+ struct pool_workqueue *pwq;
- for_each_cpu_worker_pool(pool, cpu) {
- mutex_lock(&pool->attach_mutex);
- spin_lock_irq(&pool->lock);
+ mutex_lock(&wq_pool_mutex);
+
+ WARN_ON_ONCE(!workqueue_freezing);
+ list_for_each_entry(wq, &workqueues, list) {
+ if (!(wq->flags & WQ_FREEZABLE))
+ continue;
/*
- * We've blocked all attach/detach operations. Make all workers
- * unbound and set DISASSOCIATED. Before this, all workers
- * except for the ones which are still executing works from
- * before the last CPU down must be on the cpu. After
- * this, they may become diasporas.
+ * nr_active is monotonically decreasing. It's safe
+ * to peek without lock.
*/
- for_each_pool_worker(worker, pool)
- worker->flags |= WORKER_UNBOUND;
+ rcu_read_lock_sched();
+ for_each_pwq(pwq, wq) {
+ WARN_ON_ONCE(pwq->nr_active < 0);
+ if (pwq->nr_active) {
+ busy = true;
+ rcu_read_unlock_sched();
+ goto out_unlock;
+ }
+ }
+ rcu_read_unlock_sched();
+ }
+out_unlock:
+ mutex_unlock(&wq_pool_mutex);
+ return busy;
+}
- pool->flags |= POOL_DISASSOCIATED;
+/**
+ * thaw_workqueues - thaw workqueues
+ *
+ * Thaw workqueues. Normal queueing is restored and all collected
+ * frozen works are transferred to their respective pool worklists.
+ *
+ * CONTEXT:
+ * Grabs and releases wq_pool_mutex, wq->mutex and pool->lock's.
+ */
+void thaw_workqueues(void)
+{
+ struct workqueue_struct *wq;
+ struct pool_workqueue *pwq;
- spin_unlock_irq(&pool->lock);
- mutex_unlock(&pool->attach_mutex);
+ mutex_lock(&wq_pool_mutex);
- /*
- * Call schedule() so that we cross rq->lock and thus can
- * guarantee sched callbacks see the %WORKER_UNBOUND flag.
- * This is necessary as scheduler callbacks may be invoked
- * from other cpus.
- */
- schedule();
+ if (!workqueue_freezing)
+ goto out_unlock;
- /*
- * Sched callbacks are disabled now. Zap nr_running.
- * After this, nr_running stays zero and need_more_worker()
- * and keep_working() are always true as long as the
- * worklist is not empty. This pool now behaves as an
- * unbound (in terms of concurrency management) pool which
- * are served by workers tied to the pool.
- */
- atomic_set(&pool->nr_running, 0);
+ workqueue_freezing = false;
- /*
- * With concurrency management just turned off, a busy
- * worker blocking could lead to lengthy stalls. Kick off
- * unbound chain execution of currently pending work items.
- */
- spin_lock_irq(&pool->lock);
- wake_up_worker(pool);
- spin_unlock_irq(&pool->lock);
+ /* restore max_active and repopulate worklist */
+ list_for_each_entry(wq, &workqueues, list) {
+ mutex_lock(&wq->mutex);
+ for_each_pwq(pwq, wq)
+ pwq_adjust_max_active(pwq);
+ mutex_unlock(&wq->mutex);
}
+
+out_unlock:
+ mutex_unlock(&wq_pool_mutex);
}
+#endif /* CONFIG_FREEZER */
-/**
- * rebind_workers - rebind all workers of a pool to the associated CPU
- * @pool: pool of interest
+#ifdef CONFIG_SYSFS
+/*
+ * Workqueues with WQ_SYSFS flag set is visible to userland via
+ * /sys/bus/workqueue/devices/WQ_NAME. All visible workqueues have the
+ * following attributes.
*
- * @pool->cpu is coming online. Rebind all workers to the CPU.
+ * per_cpu RO bool : whether the workqueue is per-cpu or unbound
+ * max_active RW int : maximum number of in-flight work items
+ *
+ * Unbound workqueues have the following extra attributes.
+ *
+ * id RO int : the associated pool ID
+ * nice RW int : nice value of the workers
+ * cpumask RW mask : bitmask of allowed CPUs for the workers
*/
-static void rebind_workers(struct worker_pool *pool)
+struct wq_device {
+ struct workqueue_struct *wq;
+ struct device dev;
+};
+
+static struct workqueue_struct *dev_to_wq(struct device *dev)
{
- struct worker *worker;
+ struct wq_device *wq_dev = container_of(dev, struct wq_device, dev);
- lockdep_assert_held(&pool->attach_mutex);
+ return wq_dev->wq;
+}
- /*
- * Restore CPU affinity of all workers. As all idle workers should
- * be on the run-queue of the associated CPU before any local
- * wake-ups for concurrency management happen, restore CPU affinty
- * of all workers first and then clear UNBOUND. As we're called
- * from CPU_ONLINE, the following shouldn't fail.
- */
- for_each_pool_worker(worker, pool)
- WARN_ON_ONCE(set_cpus_allowed_ptr(worker->task,
- pool->attrs->cpumask) < 0);
+static ssize_t per_cpu_show(struct device *dev, struct device_attribute *attr,
+ char *buf)
+{
+ struct workqueue_struct *wq = dev_to_wq(dev);
- spin_lock_irq(&pool->lock);
- pool->flags &= ~POOL_DISASSOCIATED;
+ return scnprintf(buf, PAGE_SIZE, "%d\n", (bool)!(wq->flags & WQ_UNBOUND));
+}
+static DEVICE_ATTR_RO(per_cpu);
- for_each_pool_worker(worker, pool) {
- unsigned int worker_flags = worker->flags;
+static ssize_t max_active_show(struct device *dev,
+ struct device_attribute *attr, char *buf)
+{
+ struct workqueue_struct *wq = dev_to_wq(dev);
- /*
- * A bound idle worker should actually be on the runqueue
- * of the associated CPU for local wake-ups targeting it to
- * work. Kick all idle workers so that they migrate to the
- * associated CPU. Doing this in the same loop as
- * replacing UNBOUND with REBOUND is safe as no worker will
- * be bound before @pool->lock is released.
- */
- if (worker_flags & WORKER_IDLE)
- wake_up_process(worker->task);
+ return scnprintf(buf, PAGE_SIZE, "%d\n", wq->saved_max_active);
+}
- /*
- * We want to clear UNBOUND but can't directly call
- * worker_clr_flags() or adjust nr_running. Atomically
- * replace UNBOUND with another NOT_RUNNING flag REBOUND.
- * @worker will clear REBOUND using worker_clr_flags() when
- * it initiates the next execution cycle thus restoring
- * concurrency management. Note that when or whether
- * @worker clears REBOUND doesn't affect correctness.
- *
- * ACCESS_ONCE() is necessary because @worker->flags may be
- * tested without holding any lock in
- * wq_worker_waking_up(). Without it, NOT_RUNNING test may
- * fail incorrectly leading to premature concurrency
- * management operations.
- */
- WARN_ON_ONCE(!(worker_flags & WORKER_UNBOUND));
- worker_flags |= WORKER_REBOUND;
- worker_flags &= ~WORKER_UNBOUND;
- ACCESS_ONCE(worker->flags) = worker_flags;
+static ssize_t max_active_store(struct device *dev,
+ struct device_attribute *attr, const char *buf,
+ size_t count)
+{
+ struct workqueue_struct *wq = dev_to_wq(dev);
+ int val;
+
+ if (sscanf(buf, "%d", &val) != 1 || val <= 0)
+ return -EINVAL;
+
+ workqueue_set_max_active(wq, val);
+ return count;
+}
+static DEVICE_ATTR_RW(max_active);
+
+static struct attribute *wq_sysfs_attrs[] = {
+ &dev_attr_per_cpu.attr,
+ &dev_attr_max_active.attr,
+ NULL,
+};
+ATTRIBUTE_GROUPS(wq_sysfs);
+
+static ssize_t wq_pool_ids_show(struct device *dev,
+ struct device_attribute *attr, char *buf)
+{
+ struct workqueue_struct *wq = dev_to_wq(dev);
+ const char *delim = "";
+ int node, written = 0;
+
+ rcu_read_lock_sched();
+ for_each_node(node) {
+ written += scnprintf(buf + written, PAGE_SIZE - written,
+ "%s%d:%d", delim, node,
+ unbound_pwq_by_node(wq, node)->pool->id);
+ delim = " ";
}
+ written += scnprintf(buf + written, PAGE_SIZE - written, "\n");
+ rcu_read_unlock_sched();
- spin_unlock_irq(&pool->lock);
+ return written;
}
-/**
- * restore_unbound_workers_cpumask - restore cpumask of unbound workers
- * @pool: unbound pool of interest
- * @cpu: the CPU which is coming up
- *
- * An unbound pool may end up with a cpumask which doesn't have any online
- * CPUs. When a worker of such pool get scheduled, the scheduler resets
- * its cpus_allowed. If @cpu is in @pool's cpumask which didn't have any
- * online CPU before, cpus_allowed of all its workers should be restored.
- */
-static void restore_unbound_workers_cpumask(struct worker_pool *pool, int cpu)
+static ssize_t wq_nice_show(struct device *dev, struct device_attribute *attr,
+ char *buf)
{
- static cpumask_t cpumask;
- struct worker *worker;
+ struct workqueue_struct *wq = dev_to_wq(dev);
+ int written;
- lockdep_assert_held(&pool->attach_mutex);
+ mutex_lock(&wq->mutex);
+ written = scnprintf(buf, PAGE_SIZE, "%d\n", wq->unbound_attrs->nice);
+ mutex_unlock(&wq->mutex);
- /* is @cpu allowed for @pool? */
- if (!cpumask_test_cpu(cpu, pool->attrs->cpumask))
- return;
+ return written;
+}
- /* is @cpu the only online CPU? */
- cpumask_and(&cpumask, pool->attrs->cpumask, cpu_online_mask);
- if (cpumask_weight(&cpumask) != 1)
- return;
+/* prepare workqueue_attrs for sysfs store operations */
+static struct workqueue_attrs *wq_sysfs_prep_attrs(struct workqueue_struct *wq)
+{
+ struct workqueue_attrs *attrs;
- /* as we're called from CPU_ONLINE, the following shouldn't fail */
- for_each_pool_worker(worker, pool)
- WARN_ON_ONCE(set_cpus_allowed_ptr(worker->task,
- pool->attrs->cpumask) < 0);
+ attrs = alloc_workqueue_attrs(GFP_KERNEL);
+ if (!attrs)
+ return NULL;
+
+ mutex_lock(&wq->mutex);
+ copy_workqueue_attrs(attrs, wq->unbound_attrs);
+ mutex_unlock(&wq->mutex);
+ return attrs;
}
-/*
- * Workqueues should be brought up before normal priority CPU notifiers.
- * This will be registered high priority CPU notifier.
- */
-static int workqueue_cpu_up_callback(struct notifier_block *nfb,
- unsigned long action,
- void *hcpu)
+static ssize_t wq_nice_store(struct device *dev, struct device_attribute *attr,
+ const char *buf, size_t count)
{
- int cpu = (unsigned long)hcpu;
- struct worker_pool *pool;
- struct workqueue_struct *wq;
- int pi;
+ struct workqueue_struct *wq = dev_to_wq(dev);
+ struct workqueue_attrs *attrs;
+ int ret;
- switch (action & ~CPU_TASKS_FROZEN) {
- case CPU_UP_PREPARE:
- for_each_cpu_worker_pool(pool, cpu) {
- if (pool->nr_workers)
- continue;
- if (!create_worker(pool))
- return NOTIFY_BAD;
- }
- break;
+ attrs = wq_sysfs_prep_attrs(wq);
+ if (!attrs)
+ return -ENOMEM;
- case CPU_DOWN_FAILED:
- case CPU_ONLINE:
- mutex_lock(&wq_pool_mutex);
+ if (sscanf(buf, "%d", &attrs->nice) == 1 &&
+ attrs->nice >= MIN_NICE && attrs->nice <= MAX_NICE)
+ ret = apply_workqueue_attrs(wq, attrs);
+ else
+ ret = -EINVAL;
- for_each_pool(pool, pi) {
- mutex_lock(&pool->attach_mutex);
+ free_workqueue_attrs(attrs);
+ return ret ?: count;
+}
- if (pool->cpu == cpu)
- rebind_workers(pool);
- else if (pool->cpu < 0)
- restore_unbound_workers_cpumask(pool, cpu);
+static ssize_t wq_cpumask_show(struct device *dev,
+ struct device_attribute *attr, char *buf)
+{
+ struct workqueue_struct *wq = dev_to_wq(dev);
+ int written;
+
+ mutex_lock(&wq->mutex);
+ written = scnprintf(buf, PAGE_SIZE, "%*pb\n",
+ cpumask_pr_args(wq->unbound_attrs->cpumask));
+ mutex_unlock(&wq->mutex);
+ return written;
+}
+
+static ssize_t wq_cpumask_store(struct device *dev,
+ struct device_attribute *attr,
+ const char *buf, size_t count)
+{
+ struct workqueue_struct *wq = dev_to_wq(dev);
+ struct workqueue_attrs *attrs;
+ int ret;
+
+ attrs = wq_sysfs_prep_attrs(wq);
+ if (!attrs)
+ return -ENOMEM;
+
+ ret = cpumask_parse(buf, attrs->cpumask);
+ if (!ret)
+ ret = apply_workqueue_attrs(wq, attrs);
+
+ free_workqueue_attrs(attrs);
+ return ret ?: count;
+}
- mutex_unlock(&pool->attach_mutex);
- }
+static ssize_t wq_numa_show(struct device *dev, struct device_attribute *attr,
+ char *buf)
+{
+ struct workqueue_struct *wq = dev_to_wq(dev);
+ int written;
- /* update NUMA affinity of unbound workqueues */
- list_for_each_entry(wq, &workqueues, list)
- wq_update_unbound_numa(wq, cpu, true);
+ mutex_lock(&wq->mutex);
+ written = scnprintf(buf, PAGE_SIZE, "%d\n",
+ !wq->unbound_attrs->no_numa);
+ mutex_unlock(&wq->mutex);
- mutex_unlock(&wq_pool_mutex);
- break;
- }
- return NOTIFY_OK;
+ return written;
}
-/*
- * Workqueues should be brought down after normal priority CPU notifiers.
- * This will be registered as low priority CPU notifier.
- */
-static int workqueue_cpu_down_callback(struct notifier_block *nfb,
- unsigned long action,
- void *hcpu)
+static ssize_t wq_numa_store(struct device *dev, struct device_attribute *attr,
+ const char *buf, size_t count)
{
- int cpu = (unsigned long)hcpu;
- struct work_struct unbind_work;
- struct workqueue_struct *wq;
-
- switch (action & ~CPU_TASKS_FROZEN) {
- case CPU_DOWN_PREPARE:
- /* unbinding per-cpu workers should happen on the local CPU */
- INIT_WORK_ONSTACK(&unbind_work, wq_unbind_fn);
- queue_work_on(cpu, system_highpri_wq, &unbind_work);
+ struct workqueue_struct *wq = dev_to_wq(dev);
+ struct workqueue_attrs *attrs;
+ int v, ret;
- /* update NUMA affinity of unbound workqueues */
- mutex_lock(&wq_pool_mutex);
- list_for_each_entry(wq, &workqueues, list)
- wq_update_unbound_numa(wq, cpu, false);
- mutex_unlock(&wq_pool_mutex);
+ attrs = wq_sysfs_prep_attrs(wq);
+ if (!attrs)
+ return -ENOMEM;
- /* wait for per-cpu unbinding to finish */
- flush_work(&unbind_work);
- destroy_work_on_stack(&unbind_work);
- break;
+ ret = -EINVAL;
+ if (sscanf(buf, "%d", &v) == 1) {
+ attrs->no_numa = !v;
+ ret = apply_workqueue_attrs(wq, attrs);
}
- return NOTIFY_OK;
+
+ free_workqueue_attrs(attrs);
+ return ret ?: count;
}
-#ifdef CONFIG_SMP
+static struct device_attribute wq_sysfs_unbound_attrs[] = {
+ __ATTR(pool_ids, 0444, wq_pool_ids_show, NULL),
+ __ATTR(nice, 0644, wq_nice_show, wq_nice_store),
+ __ATTR(cpumask, 0644, wq_cpumask_show, wq_cpumask_store),
+ __ATTR(numa, 0644, wq_numa_show, wq_numa_store),
+ __ATTR_NULL,
+};
-struct work_for_cpu {
- struct work_struct work;
- long (*fn)(void *);
- void *arg;
- long ret;
+static struct bus_type wq_subsys = {
+ .name = "workqueue",
+ .dev_groups = wq_sysfs_groups,
};
-static void work_for_cpu_fn(struct work_struct *work)
+static int __init wq_sysfs_init(void)
{
- struct work_for_cpu *wfc = container_of(work, struct work_for_cpu, work);
-
- wfc->ret = wfc->fn(wfc->arg);
+ return subsys_virtual_register(&wq_subsys, NULL);
}
+core_initcall(wq_sysfs_init);
-/**
- * work_on_cpu - run a function in user context on a particular cpu
- * @cpu: the cpu to run on
- * @fn: the function to run
- * @arg: the function arg
- *
- * It is up to the caller to ensure that the cpu doesn't go offline.
- * The caller must not hold any locks which would prevent @fn from completing.
- *
- * Return: The value @fn returns.
- */
-long work_on_cpu(int cpu, long (*fn)(void *), void *arg)
+static void wq_device_release(struct device *dev)
{
- struct work_for_cpu wfc = { .fn = fn, .arg = arg };
+ struct wq_device *wq_dev = container_of(dev, struct wq_device, dev);
- INIT_WORK_ONSTACK(&wfc.work, work_for_cpu_fn);
- schedule_work_on(cpu, &wfc.work);
- flush_work(&wfc.work);
- destroy_work_on_stack(&wfc.work);
- return wfc.ret;
+ kfree(wq_dev);
}
-EXPORT_SYMBOL_GPL(work_on_cpu);
-#endif /* CONFIG_SMP */
-
-#ifdef CONFIG_FREEZER
/**
- * freeze_workqueues_begin - begin freezing workqueues
+ * workqueue_sysfs_register - make a workqueue visible in sysfs
+ * @wq: the workqueue to register
*
- * Start freezing workqueues. After this function returns, all freezable
- * workqueues will queue new works to their delayed_works list instead of
- * pool->worklist.
+ * Expose @wq in sysfs under /sys/bus/workqueue/devices.
+ * alloc_workqueue*() automatically calls this function if WQ_SYSFS is set
+ * which is the preferred method.
*
- * CONTEXT:
- * Grabs and releases wq_pool_mutex, wq->mutex and pool->lock's.
+ * Workqueue user should use this function directly iff it wants to apply
+ * workqueue_attrs before making the workqueue visible in sysfs; otherwise,
+ * apply_workqueue_attrs() may race against userland updating the
+ * attributes.
+ *
+ * Return: 0 on success, -errno on failure.
*/
-void freeze_workqueues_begin(void)
+int workqueue_sysfs_register(struct workqueue_struct *wq)
{
- struct workqueue_struct *wq;
- struct pool_workqueue *pwq;
-
- mutex_lock(&wq_pool_mutex);
+ struct wq_device *wq_dev;
+ int ret;
- WARN_ON_ONCE(workqueue_freezing);
- workqueue_freezing = true;
+ /*
+ * Adjusting max_active or creating new pwqs by applyting
+ * attributes breaks ordering guarantee. Disallow exposing ordered
+ * workqueues.
+ */
+ if (WARN_ON(wq->flags & __WQ_ORDERED))
+ return -EINVAL;
- list_for_each_entry(wq, &workqueues, list) {
- mutex_lock(&wq->mutex);
- for_each_pwq(pwq, wq)
- pwq_adjust_max_active(pwq);
- mutex_unlock(&wq->mutex);
- }
+ wq->wq_dev = wq_dev = kzalloc(sizeof(*wq_dev), GFP_KERNEL);
+ if (!wq_dev)
+ return -ENOMEM;
- mutex_unlock(&wq_pool_mutex);
-}
+ wq_dev->wq = wq;
+ wq_dev->dev.bus = &wq_subsys;
+ wq_dev->dev.init_name = wq->name;
+ wq_dev->dev.release = wq_device_release;
-/**
- * freeze_workqueues_busy - are freezable workqueues still busy?
- *
- * Check whether freezing is complete. This function must be called
- * between freeze_workqueues_begin() and thaw_workqueues().
- *
- * CONTEXT:
- * Grabs and releases wq_pool_mutex.
- *
- * Return:
- * %true if some freezable workqueues are still busy. %false if freezing
- * is complete.
- */
-bool freeze_workqueues_busy(void)
-{
- bool busy = false;
- struct workqueue_struct *wq;
- struct pool_workqueue *pwq;
+ /*
+ * unbound_attrs are created separately. Suppress uevent until
+ * everything is ready.
+ */
+ dev_set_uevent_suppress(&wq_dev->dev, true);
- mutex_lock(&wq_pool_mutex);
+ ret = device_register(&wq_dev->dev);
+ if (ret) {
+ kfree(wq_dev);
+ wq->wq_dev = NULL;
+ return ret;
+ }
- WARN_ON_ONCE(!workqueue_freezing);
+ if (wq->flags & WQ_UNBOUND) {
+ struct device_attribute *attr;
- list_for_each_entry(wq, &workqueues, list) {
- if (!(wq->flags & WQ_FREEZABLE))
- continue;
- /*
- * nr_active is monotonically decreasing. It's safe
- * to peek without lock.
- */
- rcu_read_lock_sched();
- for_each_pwq(pwq, wq) {
- WARN_ON_ONCE(pwq->nr_active < 0);
- if (pwq->nr_active) {
- busy = true;
- rcu_read_unlock_sched();
- goto out_unlock;
+ for (attr = wq_sysfs_unbound_attrs; attr->attr.name; attr++) {
+ ret = device_create_file(&wq_dev->dev, attr);
+ if (ret) {
+ device_unregister(&wq_dev->dev);
+ wq->wq_dev = NULL;
+ return ret;
}
}
- rcu_read_unlock_sched();
}
-out_unlock:
- mutex_unlock(&wq_pool_mutex);
- return busy;
+
+ dev_set_uevent_suppress(&wq_dev->dev, false);
+ kobject_uevent(&wq_dev->dev.kobj, KOBJ_ADD);
+ return 0;
}
/**
- * thaw_workqueues - thaw workqueues
- *
- * Thaw workqueues. Normal queueing is restored and all collected
- * frozen works are transferred to their respective pool worklists.
+ * workqueue_sysfs_unregister - undo workqueue_sysfs_register()
+ * @wq: the workqueue to unregister
*
- * CONTEXT:
- * Grabs and releases wq_pool_mutex, wq->mutex and pool->lock's.
+ * If @wq is registered to sysfs by workqueue_sysfs_register(), unregister.
*/
-void thaw_workqueues(void)
+static void workqueue_sysfs_unregister(struct workqueue_struct *wq)
{
- struct workqueue_struct *wq;
- struct pool_workqueue *pwq;
-
- mutex_lock(&wq_pool_mutex);
-
- if (!workqueue_freezing)
- goto out_unlock;
-
- workqueue_freezing = false;
+ struct wq_device *wq_dev = wq->wq_dev;
- /* restore max_active and repopulate worklist */
- list_for_each_entry(wq, &workqueues, list) {
- mutex_lock(&wq->mutex);
- for_each_pwq(pwq, wq)
- pwq_adjust_max_active(pwq);
- mutex_unlock(&wq->mutex);
- }
+ if (!wq->wq_dev)
+ return;
-out_unlock:
- mutex_unlock(&wq_pool_mutex);
+ wq->wq_dev = NULL;
+ device_unregister(&wq_dev->dev);
}
-#endif /* CONFIG_FREEZER */
+#else /* CONFIG_SYSFS */
+static void workqueue_sysfs_unregister(struct workqueue_struct *wq) { }
+#endif /* CONFIG_SYSFS */
static void __init wq_numa_init(void)
{