Btrfs: fix async worker startup race
[linux-2.6-block.git] / fs / btrfs / async-thread.c
index 019e8af449abfeb1679c1a20c801d6b0082edf8b..73df627ab8abc0be8dab86a44617629a78d4e20e 100644 (file)
@@ -48,6 +48,9 @@ struct btrfs_worker_thread {
        /* number of things on the pending list */
        atomic_t num_pending;
 
+       /* reference counter for this struct */
+       atomic_t refs;
+
        unsigned long sequence;
 
        /* protects the pending list. */
@@ -71,7 +74,12 @@ static void check_idle_worker(struct btrfs_worker_thread *worker)
                unsigned long flags;
                spin_lock_irqsave(&worker->workers->lock, flags);
                worker->idle = 1;
-               list_move(&worker->worker_list, &worker->workers->idle_list);
+
+               /* the list may be empty if the worker is just starting */
+               if (!list_empty(&worker->worker_list)) {
+                       list_move(&worker->worker_list,
+                                &worker->workers->idle_list);
+               }
                spin_unlock_irqrestore(&worker->workers->lock, flags);
        }
 }
@@ -87,23 +95,49 @@ static void check_busy_worker(struct btrfs_worker_thread *worker)
                unsigned long flags;
                spin_lock_irqsave(&worker->workers->lock, flags);
                worker->idle = 0;
-               list_move_tail(&worker->worker_list,
-                              &worker->workers->worker_list);
+
+               if (!list_empty(&worker->worker_list)) {
+                       list_move_tail(&worker->worker_list,
+                                     &worker->workers->worker_list);
+               }
                spin_unlock_irqrestore(&worker->workers->lock, flags);
        }
 }
 
-static noinline int run_ordered_completions(struct btrfs_workers *workers,
-                                           struct btrfs_work *work)
+static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
 {
+       struct btrfs_workers *workers = worker->workers;
        unsigned long flags;
 
+       rmb();
+       if (!workers->atomic_start_pending)
+               return;
+
+       spin_lock_irqsave(&workers->lock, flags);
+       if (!workers->atomic_start_pending)
+               goto out;
+
+       workers->atomic_start_pending = 0;
+       if (workers->num_workers >= workers->max_workers)
+               goto out;
+
+       spin_unlock_irqrestore(&workers->lock, flags);
+       btrfs_start_workers(workers, 1);
+       return;
+
+out:
+       spin_unlock_irqrestore(&workers->lock, flags);
+}
+
+static noinline int run_ordered_completions(struct btrfs_workers *workers,
+                                           struct btrfs_work *work)
+{
        if (!workers->ordered)
                return 0;
 
        set_bit(WORK_DONE_BIT, &work->flags);
 
-       spin_lock_irqsave(&workers->lock, flags);
+       spin_lock(&workers->order_lock);
 
        while (1) {
                if (!list_empty(&workers->prio_order_list)) {
@@ -126,45 +160,117 @@ static noinline int run_ordered_completions(struct btrfs_workers *workers,
                if (test_and_set_bit(WORK_ORDER_DONE_BIT, &work->flags))
                        break;
 
-               spin_unlock_irqrestore(&workers->lock, flags);
+               spin_unlock(&workers->order_lock);
 
                work->ordered_func(work);
 
                /* now take the lock again and call the freeing code */
-               spin_lock_irqsave(&workers->lock, flags);
+               spin_lock(&workers->order_lock);
                list_del(&work->order_list);
                work->ordered_free(work);
        }
 
-       spin_unlock_irqrestore(&workers->lock, flags);
+       spin_unlock(&workers->order_lock);
        return 0;
 }
 
+static void put_worker(struct btrfs_worker_thread *worker)
+{
+       if (atomic_dec_and_test(&worker->refs))
+               kfree(worker);
+}
+
+static int try_worker_shutdown(struct btrfs_worker_thread *worker)
+{
+       int freeit = 0;
+
+       spin_lock_irq(&worker->lock);
+       spin_lock_irq(&worker->workers->lock);
+       if (worker->workers->num_workers > 1 &&
+           worker->idle &&
+           !worker->working &&
+           !list_empty(&worker->worker_list) &&
+           list_empty(&worker->prio_pending) &&
+           list_empty(&worker->pending)) {
+               freeit = 1;
+               list_del_init(&worker->worker_list);
+               worker->workers->num_workers--;
+       }
+       spin_unlock_irq(&worker->workers->lock);
+       spin_unlock_irq(&worker->lock);
+
+       if (freeit)
+               put_worker(worker);
+       return freeit;
+}
+
+static struct btrfs_work *get_next_work(struct btrfs_worker_thread *worker,
+                                       struct list_head *prio_head,
+                                       struct list_head *head)
+{
+       struct btrfs_work *work = NULL;
+       struct list_head *cur = NULL;
+
+       if(!list_empty(prio_head))
+               cur = prio_head->next;
+
+       smp_mb();
+       if (!list_empty(&worker->prio_pending))
+               goto refill;
+
+       if (!list_empty(head))
+               cur = head->next;
+
+       if (cur)
+               goto out;
+
+refill:
+       spin_lock_irq(&worker->lock);
+       list_splice_tail_init(&worker->prio_pending, prio_head);
+       list_splice_tail_init(&worker->pending, head);
+
+       if (!list_empty(prio_head))
+               cur = prio_head->next;
+       else if (!list_empty(head))
+               cur = head->next;
+       spin_unlock_irq(&worker->lock);
+
+       if (!cur)
+               goto out_fail;
+
+out:
+       work = list_entry(cur, struct btrfs_work, list);
+
+out_fail:
+       return work;
+}
+
 /*
  * main loop for servicing work items
  */
 static int worker_loop(void *arg)
 {
        struct btrfs_worker_thread *worker = arg;
-       struct list_head *cur;
+       struct list_head head;
+       struct list_head prio_head;
        struct btrfs_work *work;
+
+       INIT_LIST_HEAD(&head);
+       INIT_LIST_HEAD(&prio_head);
+
        do {
-               spin_lock_irq(&worker->lock);
-again_locked:
+again:
                while (1) {
-                       if (!list_empty(&worker->prio_pending))
-                               cur = worker->prio_pending.next;
-                       else if (!list_empty(&worker->pending))
-                               cur = worker->pending.next;
-                       else
+
+
+                       work = get_next_work(worker, &prio_head, &head);
+                       if (!work)
                                break;
 
-                       work = list_entry(cur, struct btrfs_work, list);
                        list_del(&work->list);
                        clear_bit(WORK_QUEUED_BIT, &work->flags);
 
                        work->worker = worker;
-                       spin_unlock_irq(&worker->lock);
 
                        work->func(work);
 
@@ -175,9 +281,13 @@ again_locked:
                         */
                        run_ordered_completions(worker->workers, work);
 
-                       spin_lock_irq(&worker->lock);
-                       check_idle_worker(worker);
+                       check_pending_worker_creates(worker);
+
                }
+
+               spin_lock_irq(&worker->lock);
+               check_idle_worker(worker);
+
                if (freezing(current)) {
                        worker->working = 0;
                        spin_unlock_irq(&worker->lock);
@@ -216,8 +326,10 @@ again_locked:
                                spin_lock_irq(&worker->lock);
                                set_current_state(TASK_INTERRUPTIBLE);
                                if (!list_empty(&worker->pending) ||
-                                   !list_empty(&worker->prio_pending))
-                                       goto again_locked;
+                                   !list_empty(&worker->prio_pending)) {
+                                       spin_unlock_irq(&worker->lock);
+                                       goto again;
+                               }
 
                                /*
                                 * this makes sure we get a wakeup when someone
@@ -226,8 +338,13 @@ again_locked:
                                worker->working = 0;
                                spin_unlock_irq(&worker->lock);
 
-                               if (!kthread_should_stop())
-                                       schedule();
+                               if (!kthread_should_stop()) {
+                                       schedule_timeout(HZ * 120);
+                                       if (!worker->working &&
+                                           try_worker_shutdown(worker)) {
+                                               return 0;
+                                       }
+                               }
                        }
                        __set_current_state(TASK_RUNNING);
                }
@@ -242,16 +359,30 @@ int btrfs_stop_workers(struct btrfs_workers *workers)
 {
        struct list_head *cur;
        struct btrfs_worker_thread *worker;
+       int can_stop;
 
+       spin_lock_irq(&workers->lock);
        list_splice_init(&workers->idle_list, &workers->worker_list);
        while (!list_empty(&workers->worker_list)) {
                cur = workers->worker_list.next;
                worker = list_entry(cur, struct btrfs_worker_thread,
                                    worker_list);
-               kthread_stop(worker->task);
-               list_del(&worker->worker_list);
-               kfree(worker);
+
+               atomic_inc(&worker->refs);
+               workers->num_workers -= 1;
+               if (!list_empty(&worker->worker_list)) {
+                       list_del_init(&worker->worker_list);
+                       put_worker(worker);
+                       can_stop = 1;
+               } else
+                       can_stop = 0;
+               spin_unlock_irq(&workers->lock);
+               if (can_stop)
+                       kthread_stop(worker->task);
+               spin_lock_irq(&workers->lock);
+               put_worker(worker);
        }
+       spin_unlock_irq(&workers->lock);
        return 0;
 }
 
@@ -266,10 +397,13 @@ void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max)
        INIT_LIST_HEAD(&workers->order_list);
        INIT_LIST_HEAD(&workers->prio_order_list);
        spin_lock_init(&workers->lock);
+       spin_lock_init(&workers->order_lock);
        workers->max_workers = max;
        workers->idle_thresh = 32;
        workers->name = name;
        workers->ordered = 0;
+       workers->atomic_start_pending = 0;
+       workers->atomic_worker_start = 0;
 }
 
 /*
@@ -293,7 +427,9 @@ int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
                INIT_LIST_HEAD(&worker->prio_pending);
                INIT_LIST_HEAD(&worker->worker_list);
                spin_lock_init(&worker->lock);
+
                atomic_set(&worker->num_pending, 0);
+               atomic_set(&worker->refs, 1);
                worker->workers = workers;
                worker->task = kthread_run(worker_loop, worker,
                                           "btrfs-%s-%d", workers->name,
@@ -303,7 +439,6 @@ int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
                        kfree(worker);
                        goto fail;
                }
-
                spin_lock_irq(&workers->lock);
                list_add_tail(&worker->worker_list, &workers->idle_list);
                worker->idle = 1;
@@ -367,28 +502,18 @@ static struct btrfs_worker_thread *find_worker(struct btrfs_workers *workers)
 {
        struct btrfs_worker_thread *worker;
        unsigned long flags;
+       struct list_head *fallback;
 
 again:
        spin_lock_irqsave(&workers->lock, flags);
        worker = next_worker(workers);
-       spin_unlock_irqrestore(&workers->lock, flags);
 
        if (!worker) {
-               spin_lock_irqsave(&workers->lock, flags);
                if (workers->num_workers >= workers->max_workers) {
-                       struct list_head *fallback = NULL;
-                       /*
-                        * we have failed to find any workers, just
-                        * return the force one
-                        */
-                       if (!list_empty(&workers->worker_list))
-                               fallback = workers->worker_list.next;
-                       if (!list_empty(&workers->idle_list))
-                               fallback = workers->idle_list.next;
-                       BUG_ON(!fallback);
-                       worker = list_entry(fallback,
-                                 struct btrfs_worker_thread, worker_list);
-                       spin_unlock_irqrestore(&workers->lock, flags);
+                       goto fallback;
+               } else if (workers->atomic_worker_start) {
+                       workers->atomic_start_pending = 1;
+                       goto fallback;
                } else {
                        spin_unlock_irqrestore(&workers->lock, flags);
                        /* we're below the limit, start another worker */
@@ -396,6 +521,23 @@ again:
                        goto again;
                }
        }
+       spin_unlock_irqrestore(&workers->lock, flags);
+       return worker;
+
+fallback:
+       fallback = NULL;
+       /*
+        * we have failed to find any workers, just
+        * return the first one we can find.
+        */
+       if (!list_empty(&workers->worker_list))
+               fallback = workers->worker_list.next;
+       if (!list_empty(&workers->idle_list))
+               fallback = workers->idle_list.next;
+       BUG_ON(!fallback);
+       worker = list_entry(fallback,
+                 struct btrfs_worker_thread, worker_list);
+       spin_unlock_irqrestore(&workers->lock, flags);
        return worker;
 }
 
@@ -435,9 +577,9 @@ int btrfs_requeue_work(struct btrfs_work *work)
                worker->working = 1;
        }
 
-       spin_unlock_irqrestore(&worker->lock, flags);
        if (wake)
                wake_up_process(worker->task);
+       spin_unlock_irqrestore(&worker->lock, flags);
 out:
 
        return 0;
@@ -463,14 +605,18 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
 
        worker = find_worker(workers);
        if (workers->ordered) {
-               spin_lock_irqsave(&workers->lock, flags);
+               /*
+                * you're not allowed to do ordered queues from an
+                * interrupt handler
+                */
+               spin_lock(&workers->order_lock);
                if (test_bit(WORK_HIGH_PRIO_BIT, &work->flags)) {
                        list_add_tail(&work->order_list,
                                      &workers->prio_order_list);
                } else {
                        list_add_tail(&work->order_list, &workers->order_list);
                }
-               spin_unlock_irqrestore(&workers->lock, flags);
+               spin_unlock(&workers->order_lock);
        } else {
                INIT_LIST_HEAD(&work->order_list);
        }
@@ -492,10 +638,10 @@ int btrfs_queue_worker(struct btrfs_workers *workers, struct btrfs_work *work)
                wake = 1;
        worker->working = 1;
 
-       spin_unlock_irqrestore(&worker->lock, flags);
-
        if (wake)
                wake_up_process(worker->task);
+       spin_unlock_irqrestore(&worker->lock, flags);
+
 out:
        return 0;
 }