Fix a second potential rpc_wakeup race...
[linux-2.6-block.git] / net / sunrpc / sched.c
index 6390461a97563852ce72558a77d3090c0e4750e0..66d01365f3a5c98f9a7f2439376e9338df3e78b5 100644 (file)
@@ -266,12 +266,28 @@ static int rpc_wait_bit_interruptible(void *word)
        return 0;
 }
 
+static void rpc_set_active(struct rpc_task *task)
+{
+       if (test_and_set_bit(RPC_TASK_ACTIVE, &task->tk_runstate) != 0)
+               return;
+       spin_lock(&rpc_sched_lock);
+#ifdef RPC_DEBUG
+       task->tk_magic = RPC_TASK_MAGIC_ID;
+       task->tk_pid = rpc_task_id++;
+#endif
+       /* Add to global list of all tasks */
+       list_add_tail(&task->tk_task, &all_tasks);
+       spin_unlock(&rpc_sched_lock);
+}
+
 /*
  * Mark an RPC call as having completed by clearing the 'active' bit
  */
-static inline void rpc_mark_complete_task(struct rpc_task *task)
+static void rpc_mark_complete_task(struct rpc_task *task)
 {
-       rpc_clear_active(task);
+       smp_mb__before_clear_bit();
+       clear_bit(RPC_TASK_ACTIVE, &task->tk_runstate);
+       smp_mb__after_clear_bit();
        wake_up_bit(&task->tk_runstate, RPC_TASK_ACTIVE);
 }
 
@@ -295,13 +311,15 @@ EXPORT_SYMBOL(__rpc_wait_for_completion_task);
  */
 static void rpc_make_runnable(struct rpc_task *task)
 {
-       int do_ret;
-
        BUG_ON(task->tk_timeout_fn);
-       do_ret = rpc_test_and_set_running(task);
        rpc_clear_queued(task);
-       if (do_ret)
+       if (rpc_test_and_set_running(task))
                return;
+       /* We might have raced */
+       if (RPC_IS_QUEUED(task)) {
+               rpc_clear_running(task);
+               return;
+       }
        if (RPC_IS_ASYNC(task)) {
                int status;
 
@@ -333,9 +351,6 @@ static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
                return;
        }
 
-       /* Mark the task as being activated if so needed */
-       rpc_set_active(task);
-
        __rpc_add_wait_queue(q, task);
 
        BUG_ON(task->tk_callback != NULL);
@@ -346,6 +361,9 @@ static void __rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
 void rpc_sleep_on(struct rpc_wait_queue *q, struct rpc_task *task,
                                rpc_action action, rpc_action timer)
 {
+       /* Mark the task as being activated if so needed */
+       rpc_set_active(task);
+
        /*
         * Protect the queue operations.
         */
@@ -671,8 +689,6 @@ static int __rpc_execute(struct rpc_task *task)
        }
 
        dprintk("RPC: %4d, return %d, status %d\n", task->tk_pid, status, task->tk_status);
-       /* Wake up anyone who is waiting for task completion */
-       rpc_mark_complete_task(task);
        /* Release all resources associated with the task */
        rpc_release_task(task);
        return status;
@@ -786,15 +802,6 @@ void rpc_init_task(struct rpc_task *task, struct rpc_clnt *clnt, int flags, cons
                        task->tk_flags |= RPC_TASK_NOINTR;
        }
 
-#ifdef RPC_DEBUG
-       task->tk_magic = RPC_TASK_MAGIC_ID;
-       task->tk_pid = rpc_task_id++;
-#endif
-       /* Add to global list of all tasks */
-       spin_lock(&rpc_sched_lock);
-       list_add_tail(&task->tk_task, &all_tasks);
-       spin_unlock(&rpc_sched_lock);
-
        BUG_ON(task->tk_ops == NULL);
 
        /* starting timestamp */
@@ -847,16 +854,35 @@ cleanup:
        goto out;
 }
 
-void rpc_release_task(struct rpc_task *task)
+
+void rpc_put_task(struct rpc_task *task)
 {
        const struct rpc_call_ops *tk_ops = task->tk_ops;
        void *calldata = task->tk_calldata;
 
+       if (!atomic_dec_and_test(&task->tk_count))
+               return;
+       /* Release resources */
+       if (task->tk_rqstp)
+               xprt_release(task);
+       if (task->tk_msg.rpc_cred)
+               rpcauth_unbindcred(task);
+       if (task->tk_client) {
+               rpc_release_client(task->tk_client);
+               task->tk_client = NULL;
+       }
+       if (task->tk_flags & RPC_TASK_DYNAMIC)
+               rpc_free_task(task);
+       if (tk_ops->rpc_release)
+               tk_ops->rpc_release(calldata);
+}
+EXPORT_SYMBOL(rpc_put_task);
+
+void rpc_release_task(struct rpc_task *task)
+{
 #ifdef RPC_DEBUG
        BUG_ON(task->tk_magic != RPC_TASK_MAGIC_ID);
 #endif
-       if (!atomic_dec_and_test(&task->tk_count))
-               return;
        dprintk("RPC: %4d release task\n", task->tk_pid);
 
        /* Remove from global task list */
@@ -869,23 +895,13 @@ void rpc_release_task(struct rpc_task *task)
        /* Synchronously delete any running timer */
        rpc_delete_timer(task);
 
-       /* Release resources */
-       if (task->tk_rqstp)
-               xprt_release(task);
-       if (task->tk_msg.rpc_cred)
-               rpcauth_unbindcred(task);
-       if (task->tk_client) {
-               rpc_release_client(task->tk_client);
-               task->tk_client = NULL;
-       }
-
 #ifdef RPC_DEBUG
        task->tk_magic = 0;
 #endif
-       if (task->tk_flags & RPC_TASK_DYNAMIC)
-               rpc_free_task(task);
-       if (tk_ops->rpc_release)
-               tk_ops->rpc_release(calldata);
+       /* Wake up anyone who is waiting for task completion */
+       rpc_mark_complete_task(task);
+
+       rpc_put_task(task);
 }
 
 /**
@@ -1059,10 +1075,10 @@ rpc_destroy_mempool(void)
                mempool_destroy(rpc_buffer_mempool);
        if (rpc_task_mempool)
                mempool_destroy(rpc_task_mempool);
-       if (rpc_task_slabp && kmem_cache_destroy(rpc_task_slabp))
-               printk(KERN_INFO "rpc_task: not all structures were freed\n");
-       if (rpc_buffer_slabp && kmem_cache_destroy(rpc_buffer_slabp))
-               printk(KERN_INFO "rpc_buffers: not all structures were freed\n");
+       if (rpc_task_slabp)
+               kmem_cache_destroy(rpc_task_slabp);
+       if (rpc_buffer_slabp)
+               kmem_cache_destroy(rpc_buffer_slabp);
 }
 
 int