Per-cpu completion lists
authorJens Axboe <jaxboe@fusionio.com>
Fri, 8 Oct 2010 17:48:59 +0000 (19:48 +0200)
committerJens Axboe <jaxboe@fusionio.com>
Fri, 8 Oct 2010 17:48:59 +0000 (19:48 +0200)
Signed-off-by: Jens Axboe <jaxboe@fusionio.com>
main.c

diff --git a/main.c b/main.c
index 6d31311e3be619afa731529edb9bb510fc381872..45da4f6e9cc903fb510ebc6374c7c1b276a38392 100644 (file)
--- a/main.c
+++ b/main.c
@@ -28,18 +28,24 @@ static int b_major;
 
 #define B_MAX_DEVS     64
 
+struct b_dev_cpu {
+       spinlock_t lock;
+       struct list_head done_list;
+       unsigned int done_cmds;
+};
+
 struct b_dev {
        struct list_head device_list;
-       struct list_head done_list;
+       struct list_head reaped_done;
+       spinlock_t done_lock;
        atomic_t in_flight;
-       unsigned int done_cmds;
        wait_queue_head_t wq_done;
        struct block_device *bdev;
-       spinlock_t lock;
        atomic_t ref;
        struct file *file;
        struct device *dev;
        int minor;
+       struct b_dev_cpu __percpu *cpu_queue;
        struct rcu_head rcu_free;
 };
 
@@ -133,7 +139,10 @@ static void b_dev_remove_lookup(struct b_dev *bd)
 
 static void bd_rcu_free(struct rcu_head *head)
 {
-       kfree(container_of(head, struct b_dev, rcu_free));
+       struct b_dev *bd = container_of(head, struct b_dev, rcu_free);
+
+       free_percpu(bd->cpu_queue);
+       kfree(bd);
 }
 
 static void b_dev_put(struct b_dev *bd)
@@ -170,15 +179,53 @@ static struct b_cmd *get_free_command(struct b_dev *bd)
 static struct b_cmd *get_completed_command(struct b_dev *bd)
 {
        struct b_cmd *bc = NULL;
+       int cpu, spliced = 0;
 
-       spin_lock_irq(&bd->lock);
-       if (!list_empty(&bd->done_list)) {
-               bc = list_entry(bd->done_list.next, struct b_cmd, list);
-               bd->done_cmds--;
+       spin_lock(&bd->done_lock);
+       if (!list_empty(&bd->reaped_done)) {
+ret_one:
+               bc = list_entry(bd->reaped_done.next, struct b_cmd, list);
                list_del_init(&bc->list);
        }
-       spin_unlock_irq(&bd->lock);
-       return bc;
+       spin_unlock(&bd->done_lock);
+
+       if (bc)
+               return bc;
+       
+       spin_lock(&bd->done_lock);
+       for_each_possible_cpu(cpu) {
+               struct b_dev_cpu *bdc = per_cpu_ptr(bd->cpu_queue, cpu);
+
+               spin_lock_irq(&bdc->lock);
+               if (!list_empty(&bdc->done_list)) {
+                       list_splice_init(&bdc->done_list, &bd->reaped_done);
+                       bdc->done_cmds = 0;
+                       spliced++;
+               }
+               spin_unlock_irq(&bdc->lock);
+       }
+
+       if (spliced)
+               goto ret_one;
+
+       spin_unlock(&bd->done_lock);
+       return NULL;
+}
+
+static int bd_done_count(struct b_dev *bd)
+{
+       int cpu, done;
+
+       done = 0;
+       for_each_possible_cpu(cpu) {
+               struct b_dev_cpu *bdc = per_cpu_ptr(bd->cpu_queue, cpu);
+
+               spin_lock_irq(&bdc->lock);
+               done += bdc->done_cmds;
+               spin_unlock_irq(&bdc->lock);
+       }
+
+       return done;
 }
 
 static struct b_cmd *get_done_command(struct b_dev *bd, int block)
@@ -194,7 +241,7 @@ static struct b_cmd *get_done_command(struct b_dev *bd, int block)
                if (!block)
                        break;
 
-               ret = wait_event_interruptible(bd->wq_done, bd->done_cmds);
+               ret = wait_event_interruptible(bd->wq_done, bd_done_count(bd));
                if (ret) {
                        bc = ERR_PTR(-ERESTARTSYS);
                        break;
@@ -266,6 +313,7 @@ static void b_cmd_endio(struct bio *bio, int error)
 {
        struct b_cmd *bc = bio->bi_private;
        struct b_dev *bd = bc->bd;
+       struct b_dev_cpu *bdc;
        unsigned long flags;
        unsigned long now;
 
@@ -273,10 +321,13 @@ static void b_cmd_endio(struct bio *bio, int error)
        bc->cmd.nsec = now - bc->issue_time;
        bc->cmd.error = error;
 
-       spin_lock_irqsave(&bd->lock, flags);
-       list_add_tail(&bc->list, &bd->done_list);
-       bd->done_cmds++;
-       spin_unlock_irqrestore(&bd->lock, flags);
+       local_irq_save(flags);
+       bdc = per_cpu_ptr(bd->cpu_queue, smp_processor_id());
+
+       spin_lock(&bdc->lock);
+       list_add_tail(&bc->list, &bdc->done_list);
+       bdc->done_cmds++;
+       spin_unlock_irqrestore(&bdc->lock, flags);
 
        atomic_dec(&bd->in_flight);
 
@@ -423,13 +474,18 @@ static unsigned int b_dev_poll(struct file *file, poll_table *wait)
 {
        struct b_dev *bd = file->private_data;
        unsigned int mask = POLLOUT;
+       int cpu;
 
        poll_wait(file, &bd->wq_done, wait);
 
-       spin_lock_irq(&bd->lock);
-       if (!list_empty(&bd->done_list))
-               mask |= POLLIN | POLLRDNORM;
-       spin_unlock_irq(&bd->lock);
+       for_each_possible_cpu(cpu) {
+               struct b_dev_cpu *bdc = per_cpu_ptr(bd->cpu_queue, cpu);
+
+               if (!list_empty_careful(&bdc->done_list)) {
+                       mask |= POLLIN | POLLRDNORM;
+                       break;
+               }
+       }
 
        return mask;
 }
@@ -596,7 +652,7 @@ static int b_add_dev(struct b_ioctl_cmd *bic)
        struct inode *inode;
        struct file *file;
        struct b_dev *bd;
-       int ret;
+       int ret, cpu;
 
        file = fget(bic->fd);
        if (!file)
@@ -622,9 +678,25 @@ static int b_add_dev(struct b_ioctl_cmd *bic)
                goto out_put;
        }
 
+       bd->cpu_queue = alloc_percpu(struct b_dev_cpu);
+       if (!bd->cpu_queue) {
+               kfree(bd);
+               ret = -ENOMEM;
+               goto out_put;
+       }
+
+       for_each_possible_cpu(cpu) {
+               struct b_dev_cpu *bdc;
+
+               bdc = per_cpu_ptr(bd->cpu_queue, cpu);
+               INIT_LIST_HEAD(&bdc->done_list);
+               bdc->done_cmds = 0;
+               spin_lock_init(&bdc->lock);
+       }
+
        atomic_set(&bd->ref, 1);
-       spin_lock_init(&bd->lock);
-       INIT_LIST_HEAD(&bd->done_list);
+       spin_lock_init(&bd->done_lock);
+       INIT_LIST_HEAD(&bd->reaped_done);
        init_waitqueue_head(&bd->wq_done);
        bd->file = file;
        bd->bdev = inode->i_bdev;;
@@ -657,6 +729,7 @@ out_idr:
        idr_remove(&b_minor_idr, bd->minor);
 out_unlock:
        spin_unlock(&b_dev_lock);
+       free_percpu(bd->cpu_queue);
        kfree(bd);
 out_put:
        fput(file);