Btrfs: fix deadlock on async thread startup
[linux-2.6-block.git] / fs / btrfs / async-thread.c
index 282ca085c2fbff854bcf4a396d0773db129c5085..c0861e781cdbdfde4b7c42ec1b90032f2059e69a 100644 (file)
@@ -63,6 +63,51 @@ struct btrfs_worker_thread {
        int idle;
 };
 
+/*
+ * btrfs_start_workers uses kthread_run, which can block waiting for memory
+ * for a very long time.  It will actually throttle on page writeback,
+ * and so it may not make progress until after our btrfs worker threads
+ * process all of the pending work structs in their queue
+ *
+ * This means we can't use btrfs_start_workers from inside a btrfs worker
+ * thread that is used as part of cleaning dirty memory, which pretty much
+ * involves all of the worker threads.
+ *
+ * Instead we have a helper queue who never has more than one thread
+ * where we scheduler thread start operations.  This worker_start struct
+ * is used to contain the work and hold a pointer to the queue that needs
+ * another worker.
+ */
+struct worker_start {
+       struct btrfs_work work;
+       struct btrfs_workers *queue;
+};
+
+static void start_new_worker_func(struct btrfs_work *work)
+{
+       struct worker_start *start;
+       start = container_of(work, struct worker_start, work);
+       btrfs_start_workers(start->queue, 1);
+       kfree(start);
+}
+
+static int start_new_worker(struct btrfs_workers *queue)
+{
+       struct worker_start *start;
+       int ret;
+
+       start = kzalloc(sizeof(*start), GFP_NOFS);
+       if (!start)
+               return -ENOMEM;
+
+       start->work.func = start_new_worker_func;
+       start->queue = queue;
+       ret = btrfs_queue_worker(queue->atomic_worker_start, &start->work);
+       if (ret)
+               kfree(start);
+       return ret;
+}
+
 /*
  * helper function to move a thread onto the idle list after it
  * has finished some requests.
@@ -118,11 +163,13 @@ static void check_pending_worker_creates(struct btrfs_worker_thread *worker)
                goto out;
 
        workers->atomic_start_pending = 0;
-       if (workers->num_workers >= workers->max_workers)
+       if (workers->num_workers + workers->num_workers_starting >=
+           workers->max_workers)
                goto out;
 
+       workers->num_workers_starting += 1;
        spin_unlock_irqrestore(&workers->lock, flags);
-       btrfs_start_workers(workers, 1);
+       start_new_worker(workers);
        return;
 
 out:
@@ -390,9 +437,11 @@ int btrfs_stop_workers(struct btrfs_workers *workers)
 /*
  * simple init on struct btrfs_workers
  */
-void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max)
+void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max,
+                       struct btrfs_workers *async_helper)
 {
        workers->num_workers = 0;
+       workers->num_workers_starting = 0;
        INIT_LIST_HEAD(&workers->worker_list);
        INIT_LIST_HEAD(&workers->idle_list);
        INIT_LIST_HEAD(&workers->order_list);
@@ -404,14 +453,15 @@ void btrfs_init_workers(struct btrfs_workers *workers, char *name, int max)
        workers->name = name;
        workers->ordered = 0;
        workers->atomic_start_pending = 0;
-       workers->atomic_worker_start = 0;
+       workers->atomic_worker_start = async_helper;
 }
 
 /*
  * starts new worker threads.  This does not enforce the max worker
  * count in case you need to temporarily go past it.
  */
-int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
+static int __btrfs_start_workers(struct btrfs_workers *workers,
+                                int num_workers)
 {
        struct btrfs_worker_thread *worker;
        int ret = 0;
@@ -444,6 +494,8 @@ int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
                list_add_tail(&worker->worker_list, &workers->idle_list);
                worker->idle = 1;
                workers->num_workers++;
+               workers->num_workers_starting--;
+               WARN_ON(workers->num_workers_starting < 0);
                spin_unlock_irq(&workers->lock);
        }
        return 0;
@@ -452,6 +504,14 @@ fail:
        return ret;
 }
 
+int btrfs_start_workers(struct btrfs_workers *workers, int num_workers)
+{
+       spin_lock_irq(&workers->lock);
+       workers->num_workers_starting += num_workers;
+       spin_unlock_irq(&workers->lock);
+       return __btrfs_start_workers(workers, num_workers);
+}
+
 /*
  * run through the list and find a worker thread that doesn't have a lot
  * to do right now.  This can return null if we aren't yet at the thread
@@ -461,7 +521,10 @@ static struct btrfs_worker_thread *next_worker(struct btrfs_workers *workers)
 {
        struct btrfs_worker_thread *worker;
        struct list_head *next;
-       int enforce_min = workers->num_workers < workers->max_workers;
+       int enforce_min;
+
+       enforce_min = (workers->num_workers + workers->num_workers_starting) <
+               workers->max_workers;
 
        /*
         * if we find an idle thread, don't move it to the end of the
@@ -509,15 +572,17 @@ again:
        worker = next_worker(workers);
 
        if (!worker) {
-               if (workers->num_workers >= workers->max_workers) {
+               if (workers->num_workers + workers->num_workers_starting >=
+                   workers->max_workers) {
                        goto fallback;
                } else if (workers->atomic_worker_start) {
                        workers->atomic_start_pending = 1;
                        goto fallback;
                } else {
+                       workers->num_workers_starting++;
                        spin_unlock_irqrestore(&workers->lock, flags);
                        /* we're below the limit, start another worker */
-                       btrfs_start_workers(workers, 1);
+                       __btrfs_start_workers(workers, 1);
                        goto again;
                }
        }