/*
* The per-CPU workqueue (if single thread, we always use the first
* possible cpu).
- *
- * The sequence counters are for flush_scheduled_work(). It wants to wait
- * until all currently-scheduled works are completed, but it doesn't
- * want to be livelocked by new, incoming ones. So it waits until
- * remove_sequence is >= the insert_sequence which pertained when
- * flush_scheduled_work() was called.
*/
struct cpu_workqueue_struct {
spinlock_t lock;
- long remove_sequence; /* Least-recently added (next to run) */
- long insert_sequence; /* Next to add */
-
struct list_head worklist;
wait_queue_head_t more_work;
- wait_queue_head_t work_done;
struct workqueue_struct *wq;
struct task_struct *thread;
+ struct work_struct *current_work;
int run_depth; /* Detect run_workqueue() recursion depth */
&& work_pending(work)
&& !list_empty(&work->entry)) {
work_func_t f = work->func;
+ cwq->current_work = work;
list_del_init(&work->entry);
spin_unlock_irqrestore(&cwq->lock, flags);
f(work);
spin_lock_irqsave(&cwq->lock, flags);
- cwq->remove_sequence++;
- wake_up(&cwq->work_done);
+ cwq->current_work = NULL;
ret = 1;
}
spin_unlock_irqrestore(&cwq->lock, flags);
}
EXPORT_SYMBOL(run_scheduled_work);
+static void insert_work(struct cpu_workqueue_struct *cwq,
+ struct work_struct *work, int tail)
+{
+ set_wq_data(work, cwq);
+ if (tail)
+ list_add_tail(&work->entry, &cwq->worklist);
+ else
+ list_add(&work->entry, &cwq->worklist);
+ wake_up(&cwq->more_work);
+}
+
/* Preempt must be disabled. */
static void __queue_work(struct cpu_workqueue_struct *cwq,
struct work_struct *work)
unsigned long flags;
spin_lock_irqsave(&cwq->lock, flags);
- set_wq_data(work, cwq);
- list_add_tail(&work->entry, &cwq->worklist);
- cwq->insert_sequence++;
- wake_up(&cwq->more_work);
+ insert_work(cwq, work, 1);
spin_unlock_irqrestore(&cwq->lock, flags);
}
}
EXPORT_SYMBOL_GPL(queue_work);
-static void delayed_work_timer_fn(unsigned long __data)
+void delayed_work_timer_fn(unsigned long __data)
{
struct delayed_work *dwork = (struct delayed_work *)__data;
struct workqueue_struct *wq = get_wq_data(&dwork->work);
struct timer_list *timer = &dwork->timer;
struct work_struct *work = &dwork->work;
+ timer_stats_timer_set_start_info(timer);
if (delay == 0)
return queue_work(wq, work);
struct work_struct, entry);
work_func_t f = work->func;
+ cwq->current_work = work;
list_del_init(cwq->worklist.next);
spin_unlock_irqrestore(&cwq->lock, flags);
}
spin_lock_irqsave(&cwq->lock, flags);
- cwq->remove_sequence++;
- wake_up(&cwq->work_done);
+ cwq->current_work = NULL;
}
cwq->run_depth--;
spin_unlock_irqrestore(&cwq->lock, flags);
return 0;
}
+struct wq_barrier {
+ struct work_struct work;
+ struct completion done;
+};
+
+static void wq_barrier_func(struct work_struct *work)
+{
+ struct wq_barrier *barr = container_of(work, struct wq_barrier, work);
+ complete(&barr->done);
+}
+
+static inline void init_wq_barrier(struct wq_barrier *barr)
+{
+ INIT_WORK(&barr->work, wq_barrier_func);
+ __set_bit(WORK_STRUCT_PENDING, work_data_bits(&barr->work));
+
+ init_completion(&barr->done);
+}
+
static void flush_cpu_workqueue(struct cpu_workqueue_struct *cwq)
{
if (cwq->thread == current) {
* Probably keventd trying to flush its own queue. So simply run
* it by hand rather than deadlocking.
*/
+ mutex_unlock(&workqueue_mutex);
run_workqueue(cwq);
+ mutex_lock(&workqueue_mutex);
} else {
- DEFINE_WAIT(wait);
- long sequence_needed;
+ struct wq_barrier barr;
- spin_lock_irq(&cwq->lock);
- sequence_needed = cwq->insert_sequence;
+ init_wq_barrier(&barr);
+ __queue_work(cwq, &barr.work);
- while (sequence_needed - cwq->remove_sequence > 0) {
- prepare_to_wait(&cwq->work_done, &wait,
- TASK_UNINTERRUPTIBLE);
- spin_unlock_irq(&cwq->lock);
- schedule();
- spin_lock_irq(&cwq->lock);
- }
- finish_wait(&cwq->work_done, &wait);
- spin_unlock_irq(&cwq->lock);
+ mutex_unlock(&workqueue_mutex);
+ wait_for_completion(&barr.done);
+ mutex_lock(&workqueue_mutex);
}
}
* Forces execution of the workqueue and blocks until its completion.
* This is typically used in driver shutdown handlers.
*
- * This function will sample each workqueue's current insert_sequence number and
- * will sleep until the head sequence is greater than or equal to that. This
- * means that we sleep until all works which were queued on entry have been
- * handled, but we are not livelocked by new incoming ones.
+ * We sleep until all works which were queued on entry have been handled,
+ * but we are not livelocked by new incoming ones.
*
* This function used to run the workqueues itself. Now we just wait for the
* helper threads to do it.
*/
void fastcall flush_workqueue(struct workqueue_struct *wq)
{
- might_sleep();
-
+ mutex_lock(&workqueue_mutex);
if (is_single_threaded(wq)) {
/* Always use first cpu's area. */
flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, singlethread_cpu));
} else {
int cpu;
- mutex_lock(&workqueue_mutex);
for_each_online_cpu(cpu)
flush_cpu_workqueue(per_cpu_ptr(wq->cpu_wq, cpu));
- mutex_unlock(&workqueue_mutex);
}
+ mutex_unlock(&workqueue_mutex);
}
EXPORT_SYMBOL_GPL(flush_workqueue);
+static void wait_on_work(struct cpu_workqueue_struct *cwq,
+ struct work_struct *work)
+{
+ struct wq_barrier barr;
+ int running = 0;
+
+ spin_lock_irq(&cwq->lock);
+ if (unlikely(cwq->current_work == work)) {
+ init_wq_barrier(&barr);
+ insert_work(cwq, &barr.work, 0);
+ running = 1;
+ }
+ spin_unlock_irq(&cwq->lock);
+
+ if (unlikely(running)) {
+ mutex_unlock(&workqueue_mutex);
+ wait_for_completion(&barr.done);
+ mutex_lock(&workqueue_mutex);
+ }
+}
+
+/**
+ * flush_work - block until a work_struct's callback has terminated
+ * @wq: the workqueue on which the work is queued
+ * @work: the work which is to be flushed
+ *
+ * flush_work() will attempt to cancel the work if it is queued. If the work's
+ * callback appears to be running, flush_work() will block until it has
+ * completed.
+ *
+ * flush_work() is designed to be used when the caller is tearing down data
+ * structures which the callback function operates upon. It is expected that,
+ * prior to calling flush_work(), the caller has arranged for the work to not
+ * be requeued.
+ */
+void flush_work(struct workqueue_struct *wq, struct work_struct *work)
+{
+ struct cpu_workqueue_struct *cwq;
+
+ mutex_lock(&workqueue_mutex);
+ cwq = get_wq_data(work);
+ /* Was it ever queued ? */
+ if (!cwq)
+ goto out;
+
+ /*
+ * This work can't be re-queued, and the lock above protects us
+ * from take_over_work(), no need to re-check that get_wq_data()
+ * is still the same when we take cwq->lock.
+ */
+ spin_lock_irq(&cwq->lock);
+ list_del_init(&work->entry);
+ work_release(work);
+ spin_unlock_irq(&cwq->lock);
+
+ if (is_single_threaded(wq)) {
+ /* Always use first cpu's area. */
+ wait_on_work(per_cpu_ptr(wq->cpu_wq, singlethread_cpu), work);
+ } else {
+ int cpu;
+
+ for_each_online_cpu(cpu)
+ wait_on_work(per_cpu_ptr(wq->cpu_wq, cpu), work);
+ }
+out:
+ mutex_unlock(&workqueue_mutex);
+}
+EXPORT_SYMBOL_GPL(flush_work);
+
static struct task_struct *create_workqueue_thread(struct workqueue_struct *wq,
int cpu, int freezeable)
{
spin_lock_init(&cwq->lock);
cwq->wq = wq;
cwq->thread = NULL;
- cwq->insert_sequence = 0;
- cwq->remove_sequence = 0;
cwq->freezeable = freezeable;
INIT_LIST_HEAD(&cwq->worklist);
init_waitqueue_head(&cwq->more_work);
- init_waitqueue_head(&cwq->work_done);
if (is_single_threaded(wq))
p = kthread_create(worker_thread, cwq, "%s", wq->name);
* After waiting for a given time this puts a job in the kernel-global
* workqueue.
*/
-int fastcall schedule_delayed_work(struct delayed_work *dwork, unsigned long delay)
+int fastcall schedule_delayed_work(struct delayed_work *dwork,
+ unsigned long delay)
{
+ timer_stats_timer_set_start_info(&dwork->timer);
return queue_delayed_work(keventd_wq, dwork, delay);
}
EXPORT_SYMBOL(schedule_delayed_work);
if (!works)
return -ENOMEM;
- mutex_lock(&workqueue_mutex);
+ preempt_disable(); /* CPU hotplug */
for_each_online_cpu(cpu) {
struct work_struct *work = per_cpu_ptr(works, cpu);
set_bit(WORK_STRUCT_PENDING, work_data_bits(work));
__queue_work(per_cpu_ptr(keventd_wq->cpu_wq, cpu), work);
}
- mutex_unlock(&workqueue_mutex);
+ preempt_enable();
flush_workqueue(keventd_wq);
free_percpu(works);
return 0;
}
EXPORT_SYMBOL(flush_scheduled_work);
+void flush_work_keventd(struct work_struct *work)
+{
+ flush_work(keventd_wq, work);
+}
+EXPORT_SYMBOL(flush_work_keventd);
+
/**
* cancel_rearming_delayed_workqueue - reliably kill off a delayed work whose handler rearms the delayed work.
* @wq: the controlling workqueue structure