v3 syslet engine support
authorIngo Molnar <mingo@elte.hu>
Wed, 21 Feb 2007 22:25:44 +0000 (23:25 +0100)
committerJens Axboe <axboe@nelson.home.kernel.dk>
Wed, 21 Feb 2007 22:25:44 +0000 (23:25 +0100)
Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
arch-x86.h
engines/syslet-rw.c
os-linux.h
syslet.h

index 2b47a40b20bf9a69a97a576e9d5a951e71a545de..804559d6fa904981b856275e9e5d3cfa4957da18 100644 (file)
 #define __NR_sys_vmsplice      316
 #endif
 
-#ifndef __NR_async_register
-#define __NR_async_register    320
-#define        __NR_async_exec         321
-#define __NR_async_wait                322
-#define        __NR_async_unregister   323
-#define __NR_umem_add          324
+#ifndef __NR_async_exec
+# define __NR_async_exec       320
+# define __NR_async_wait       321
+# define __NR_umem_add         322
+# define __NR_async_thread     323
 #endif
 
 #define nop    __asm__ __volatile__("rep;nop": : :"memory")
index bfb6021f63f2ba6aa5fd72b7874eede23248913e..ab2cba2a4aa43ea6c1b5f3842c73cfd3e3a64c7a 100644 (file)
@@ -17,9 +17,8 @@ struct syslet_data {
        struct io_u **events;
        unsigned int nr_events;
        
-       struct async_head_user *ahu;
+       struct async_head_user ahu;
        struct syslet_uatom **ring;
-       unsigned int ring_index;
 };
 
 /*
@@ -34,13 +33,13 @@ static void fio_syslet_complete(struct thread_data *td)
                struct io_u *io_u;
                long ret;
 
-               atom = sd->ring[sd->ring_index];
+               atom = sd->ring[sd->ahu.user_ring_idx];
                if (!atom)
                        break;
 
-               sd->ring[sd->ring_index] = NULL;
-               if (++sd->ring_index == td->iodepth)
-                       sd->ring_index = 0;
+               sd->ring[sd->ahu.user_ring_idx] = NULL;
+               if (++sd->ahu.user_ring_idx == td->iodepth)
+                       sd->ahu.user_ring_idx = 0;
 
                io_u = atom->private;
                ret = *atom->ret_ptr;
@@ -74,7 +73,7 @@ static int fio_syslet_getevents(struct thread_data *td, int min,
                 * OK, we need to wait for some events...
                 */
                get_events = min - sd->nr_events;
-               ret = async_wait(get_events);
+               ret = async_wait(get_events, sd->ahu.user_ring_idx, &sd->ahu);
                if (ret < 0)
                        return errno;
        } while (1);
@@ -144,16 +143,35 @@ static int fio_syslet_prep(struct thread_data fio_unused *td, struct io_u *io_u)
        return 0;
 }
 
+static void cachemiss_thread_start(void)
+{
+       while (1)
+               async_thread();
+}
+
+#define THREAD_STACK_SIZE (16384)
+
+static unsigned long thread_stack_alloc()
+{
+       return (unsigned long)malloc(THREAD_STACK_SIZE) + THREAD_STACK_SIZE;
+}
+
 static int fio_syslet_queue(struct thread_data *td, struct io_u *io_u)
 {
        struct syslet_data *sd = td->io_ops->data;
+       struct syslet_uatom *done;
        long ret;
 
+       if (!sd->ahu.new_thread_stack)
+               sd->ahu.new_thread_stack = thread_stack_alloc();
+
        /*
         * On sync completion, the atom is returned. So on NULL return
         * it's queued asynchronously.
         */
-       if (!async_exec(&io_u->req.atom))
+       done = async_exec(&io_u->req.atom, &sd->ahu);
+
+       if (!done)
                return FIO_Q_QUEUED;
 
        /*
@@ -169,8 +187,11 @@ static int fio_syslet_queue(struct thread_data *td, struct io_u *io_u)
                        io_u->error = errno;
        }
 
+       if (sd->nr_events >= td->iodepth)
+               printf("ouch! %d\n", sd->nr_events);
+
        if (!io_u->error)
-               sd->events[sd->nr_events++] = io_u;
+               /* nothing */;
        else
                td_verror(td, io_u->error);
 
@@ -181,34 +202,24 @@ static int async_head_init(struct syslet_data *sd, unsigned int depth)
 {
        unsigned long ring_size;
 
-       sd->ahu = malloc(sizeof(struct async_head_user));
-       memset(sd->ahu, 0, sizeof(struct async_head_user));
+       memset(&sd->ahu, 0, sizeof(struct async_head_user));
 
        ring_size = sizeof(struct syslet_uatom *) * depth;
        sd->ring = malloc(ring_size);
        memset(sd->ring, 0, ring_size);
 
-       sd->ahu->completion_ring = sd->ring;
-       sd->ahu->ring_size_bytes = ring_size;
-       sd->ahu->max_nr_threads = -1;
-
-       if (async_register(sd->ahu, sizeof(*sd->ahu)) < 0) {
-               perror("async_register");
-               fprintf(stderr, "fio: syslet likely not supported\n");
-               free(sd->ring);
-               free(sd->ahu);
-               return 1;
-       }
+       sd->ahu.user_ring_idx = 0;
+       sd->ahu.completion_ring = sd->ring;
+       sd->ahu.ring_size_bytes = ring_size;
+       sd->ahu.head_stack = thread_stack_alloc();
+       sd->ahu.head_eip = (unsigned long)cachemiss_thread_start;
+       sd->ahu.new_thread_eip = (unsigned long)cachemiss_thread_start;
 
        return 0;
 }
 
 static void async_head_exit(struct syslet_data *sd)
 {
-       if (async_unregister(sd->ahu, sizeof(*sd->ahu)) < 0)
-               perror("async_register");
-
-       free(sd->ahu);
        free(sd->ring);
 }
 
index 657a679e23486e0e8e319d3cc10dd8f8a7c635f4..b147d4f3b340c0fd2bb5f352b469beb97e6b57e7 100644 (file)
@@ -81,27 +81,30 @@ static inline int vmsplice(int fd, const struct iovec *iov,
 #define SPLICE_DEF_SIZE        (64*1024)
 
 #ifdef FIO_HAVE_SYSLET
+
+struct syslet_uatom;
+struct async_head_user;
+
 /*
  * syslet stuff
  */
-static inline long async_register(void *uah, unsigned int len)
-{
-       return syscall(__NR_async_register, uah, len);
-}
-
-static inline void *async_exec(void *data)
+static inline struct syslet_uatom *
+async_exec(struct syslet_uatom *atom, struct async_head_user *ahu)
 {
-       return (void *) syscall(__NR_async_exec, data);
+       return (void *) syscall(__NR_async_exec, atom, ahu);
 }
 
-static inline long async_wait(unsigned long min_events)
+static inline long
+async_wait(unsigned long min_wait_events, unsigned long user_ring_idx,
+          struct async_head_user *ahu)
 {
-       return syscall(__NR_async_wait, min_events);
+       return syscall(__NR_async_wait, min_wait_events,
+                       user_ring_idx, ahu);
 }
 
-static inline long async_unregister(void *uah, unsigned int len)
+static inline long async_thread(void)
 {
-       return syscall(__NR_async_unregister, uah, len);
+       return syscall(__NR_async_thread);
 }
 
 static inline long umem_add(unsigned long *uptr, unsigned long inc)
index d89abd440822f0fcf74c9cb7c3cb3c9670749a02..84dd1c756d417e69c866e3410a8dfcbc7dc0fc81 100644 (file)
--- a/syslet.h
+++ b/syslet.h
@@ -78,7 +78,7 @@ struct syslet_uatom {
 
 /*
  * Execution control: conditions upon the return code
- * of the previous syslet atom. 'Stop' means syslet
+ * of the just executed syslet atom. 'Stop' means syslet
  * execution is stopped and the atom is put into the
  * completion ring:
  */
@@ -105,36 +105,51 @@ struct syslet_uatom {
 
 /*
  * This is the (per-user-context) descriptor of the async completion
- * ring. This gets registered via sys_async_register().
+ * ring. This gets passed in to sys_async_exec():
  */
 struct async_head_user {
        /*
-        * Pointers to completed async syslets (i.e. syslets that
+        * Current completion ring index - managed by the kernel:
+        */
+       unsigned long                           kernel_ring_idx;
+       /*
+        * User-side ring index:
+        */
+       unsigned long                           user_ring_idx;
+
+       /*
+        * Ring of pointers to completed async syslets (i.e. syslets that
         * generated a cachemiss and went async, returning -EASYNCSYSLET
         * to the user context by sys_async_exec()) are queued here.
-        * Syslets that were executed synchronously are not queued here.
+        * Syslets that were executed synchronously (cached) are not
+        * queued here.
         *
         * Note: the final atom that generated the exit condition is
         * queued here. Normally this would be the last atom of a syslet.
         */
        struct syslet_uatom __user              **completion_ring;
+
        /*
         * Ring size in bytes:
         */
        unsigned long                           ring_size_bytes;
 
        /*
-        * Maximum number of asynchronous contexts the kernel creates.
-        *
-        * -1UL has a special meaning: the kernel manages the optimal
-        * size of the async pool.
-        *
-        * Note: this field should be valid for the lifetime of async
-        * processing, because future kernels detect changes to this
-        * field. (enabling user-space to control the size of the async
-        * pool in a low-overhead fashion)
+        * The head task can become a cachemiss thread later on
+        * too, if it blocks - so it needs its separate thread
+        * stack and start address too:
+        */
+       unsigned long                           head_stack;
+       unsigned long                           head_eip;
+
+       /*
+        * Newly started async kernel threads will take their
+        * user stack and user start address from here. User-space
+        * code has to check for new_thread_stack going to NULL
+        * and has to refill it with a new stack if that happens.
         */
-       unsigned long                           max_nr_threads;
+       unsigned long                           new_thread_stack;
+       unsigned long                           new_thread_eip;
 };
 
 #endif