[PATCH] First cut syslet async io support
authorJens Axboe <jens.axboe@oracle.com>
Wed, 14 Feb 2007 00:16:39 +0000 (01:16 +0100)
committerJens Axboe <jens.axboe@oracle.com>
Wed, 14 Feb 2007 00:16:39 +0000 (01:16 +0100)
Doesn't seem to perform as well as expected, needs investigation.

Signed-off-by: Jens Axboe <jens.axboe@oracle.com>
Makefile
arch-x86.h
engines/syslet-rw.c [new file with mode: 0644]
fio.h
init.c
log.c
os-linux.h
syslet.h [new file with mode: 0644]
verify.c

index 1fa29ca..2b027db 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -17,6 +17,7 @@ OBJS += engines/splice.o
 OBJS += engines/sync.o
 OBJS += engines/null.o
 OBJS += engines/net.o
+OBJS += engines/syslet-rw.o
 
 INSTALL = install
 prefix = /usr/local
index 96b4602..2b47a40 100644 (file)
 #define __NR_sys_vmsplice      316
 #endif
 
+#ifndef __NR_async_register
+#define __NR_async_register    320
+#define        __NR_async_exec         321
+#define __NR_async_wait                322
+#define        __NR_async_unregister   323
+#define __NR_umem_add          324
+#endif
+
 #define nop    __asm__ __volatile__("rep;nop": : :"memory")
 
 static inline unsigned long ffz(unsigned long bitmask)
diff --git a/engines/syslet-rw.c b/engines/syslet-rw.c
new file mode 100644 (file)
index 0000000..7099a6a
--- /dev/null
@@ -0,0 +1,283 @@
+/*
+ * read/write() engine that uses syslet to be async
+ *
+ */
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <errno.h>
+#include <assert.h>
+
+#include "../fio.h"
+#include "../os.h"
+
+#ifdef FIO_HAVE_SYSLET
+
+struct syslet_data {
+       struct io_u **events;
+       unsigned int nr_events;
+       
+       struct syslet_uatom **ring;
+       unsigned int ring_index;
+};
+
+/*
+ * Inspect the ring to see if we have completed events
+ */
+static void fio_syslet_complete(struct thread_data *td)
+{
+       struct syslet_data *sd = td->io_ops->data;
+
+       do {
+               struct syslet_uatom *atom;
+               struct io_u *io_u;
+               long ret;
+
+               atom = sd->ring[sd->ring_index];
+               if (!atom)
+                       break;
+
+               sd->ring[sd->ring_index] = NULL;
+               if (++sd->ring_index == td->iodepth)
+                       sd->ring_index = 0;
+
+               io_u = atom->private;
+               ret = *atom->ret_ptr;
+               if (ret > 0)
+                       io_u->resid = io_u->xfer_buflen - ret;
+               else if (ret < 0)
+                       io_u->error = ret;
+
+               sd->events[sd->nr_events++] = io_u;
+       } while (1);
+}
+
+static int fio_syslet_getevents(struct thread_data *td, int min,
+                               int fio_unused max,
+                               struct timespec fio_unused *t)
+{
+       struct syslet_data *sd = td->io_ops->data;
+       int get_events;
+       long ret;
+
+       do {
+               fio_syslet_complete(td);
+
+               /*
+                * do we have enough immediate completions?
+                */
+               if (sd->nr_events >= (unsigned int) min)
+                       break;
+
+               /*
+                * OK, we need to wait for some events...
+                */
+               get_events = min - sd->nr_events;
+               ret = async_wait(get_events);
+               if (ret < 0)
+                       return errno;
+       } while (1);
+
+       ret = sd->nr_events;
+       sd->nr_events = 0;
+       return ret;
+}
+
+static struct io_u *fio_syslet_event(struct thread_data *td, int event)
+{
+       struct syslet_data *sd = td->io_ops->data;
+
+       return sd->events[event];
+}
+
+static void init_atom(struct syslet_uatom *atom, int nr, void *arg0,
+                     void *arg1, void *arg2, void *ret_ptr,
+                     unsigned long flags, void *priv,struct syslet_uatom *next)
+{
+       atom->flags = flags;
+       atom->nr = nr;
+       atom->ret_ptr = ret_ptr;
+       atom->next = next;
+       atom->arg_ptr[0] = arg0;
+       atom->arg_ptr[1] = arg1;
+       atom->arg_ptr[2] = arg2;
+       atom->arg_ptr[3] = atom->arg_ptr[4] = atom->arg_ptr[5] = NULL;
+       atom->private = priv;
+}
+
+/*
+ * Use seek atom for sync
+ */
+static void fio_syslet_prep_sync(struct io_u *io_u, struct fio_file *f)
+{
+       init_atom(&io_u->seek_atom.atom, __NR_fsync, &f->fd, NULL, NULL,
+                 &io_u->seek_atom.ret, SYSLET_STOP_ON_NEGATIVE, io_u, NULL);
+}
+
+static void fio_syslet_prep_rw(struct io_u *io_u, struct fio_file *f)
+{
+       int nr;
+
+       /*
+        * prepare seek
+        */
+       io_u->seek_atom.cmd = SEEK_SET;
+       init_atom(&io_u->seek_atom.atom, __NR_lseek, &f->fd, &io_u->offset,
+                 &io_u->seek_atom.cmd, &io_u->seek_atom.ret,
+                 SYSLET_STOP_ON_NEGATIVE | SYSLET_NO_COMPLETE |
+                       SYSLET_SKIP_TO_NEXT_ON_STOP,
+                 NULL, &io_u->rw_atom.atom);
+
+       /*
+        * prepare rw
+        */
+       if (io_u->ddir == DDIR_READ)
+               nr = __NR_read;
+       else
+               nr = __NR_write;
+
+       init_atom(&io_u->rw_atom.atom, nr, &f->fd, &io_u->xfer_buf,
+                 &io_u->xfer_buflen, &io_u->rw_atom.ret,
+                 SYSLET_STOP_ON_NEGATIVE | SYSLET_SKIP_TO_NEXT_ON_STOP,
+                 io_u, NULL);
+}
+
+static int fio_syslet_prep(struct thread_data fio_unused *td, struct io_u *io_u)
+{
+       struct fio_file *f = io_u->file;
+
+       if (io_u->ddir == DDIR_SYNC)
+               fio_syslet_prep_sync(io_u, f);
+       else
+               fio_syslet_prep_rw(io_u, f);
+
+       return 0;
+}
+
+static int fio_syslet_queue(struct thread_data *td, struct io_u *io_u)
+{
+       struct syslet_data *sd = td->io_ops->data;
+       struct syslet_uatom *done;
+       long ret;
+
+       done = async_exec(&io_u->seek_atom.atom);
+       if (!done)
+               return 0;
+
+       /*
+        * completed sync
+        */
+       ret = io_u->rw_atom.ret;
+       if (ret != (long) io_u->xfer_buflen) {
+               if (ret > 0) {
+                       io_u->resid = io_u->xfer_buflen - ret;
+                       io_u->error = 0;
+                       return ret;
+               } else
+                       io_u->error = errno;
+       }
+
+       if (!io_u->error)
+               sd->events[sd->nr_events++] = io_u;
+       else
+               td_verror(td, io_u->error);
+
+       return io_u->error;
+}
+
+static void async_head_init(struct syslet_data *sd, unsigned int depth)
+{
+       struct async_head_user ahu;
+       unsigned long ring_size;
+
+       ring_size = sizeof(struct syslet_uatom *) * depth;
+       sd->ring = malloc(ring_size);
+       memset(sd->ring, 0, ring_size);
+
+       memset(&ahu, 0, sizeof(ahu));
+       ahu.completion_ring = sd->ring;
+       ahu.ring_size_bytes = ring_size;
+       ahu.max_nr_threads = -1;
+
+       if (async_register(&ahu, sizeof(ahu)) < 0)
+               perror("async_register");
+}
+
+static void async_head_exit(struct syslet_data *sd, unsigned int depth)
+{
+       struct async_head_user ahu;
+
+       memset(&ahu, 0, sizeof(ahu));
+       ahu.completion_ring = sd->ring;
+       ahu.ring_size_bytes = sizeof(struct syslet_uatom *) * depth;
+
+       if (async_unregister(&ahu, sizeof(ahu)) < 0)
+               perror("async_register");
+}
+
+static void fio_syslet_cleanup(struct thread_data *td)
+{
+       struct syslet_data *sd = td->io_ops->data;
+
+       if (sd) {
+               async_head_exit(sd, td->iodepth);
+               free(sd->events);
+               free(sd);
+               td->io_ops->data = NULL;
+       }
+}
+
+static int fio_syslet_init(struct thread_data *td)
+{
+       struct syslet_data *sd;
+
+       sd = malloc(sizeof(*sd));
+       memset(sd, 0, sizeof(*sd));
+       sd->events = malloc(sizeof(struct io_u *) * td->iodepth);
+       memset(sd->events, 0, sizeof(struct io_u *) * td->iodepth);
+       td->io_ops->data = sd;
+       async_head_init(sd, td->iodepth);
+       return 0;
+}
+
+static struct ioengine_ops ioengine = {
+       .name           = "syslet-rw",
+       .version        = FIO_IOOPS_VERSION,
+       .init           = fio_syslet_init,
+       .prep           = fio_syslet_prep,
+       .queue          = fio_syslet_queue,
+       .getevents      = fio_syslet_getevents,
+       .event          = fio_syslet_event,
+       .cleanup        = fio_syslet_cleanup,
+};
+
+#else /* FIO_HAVE_SYSLET */
+
+/*
+ * When we have a proper configure system in place, we simply wont build
+ * and install this io engine. For now install a crippled version that
+ * just complains and fails to load.
+ */
+static int fio_syslet_init(struct thread_data fio_unused *td)
+{
+       fprintf(stderr, "fio: syslet not available\n");
+       return 1;
+}
+
+static struct ioengine_ops ioengine = {
+       .name           = "syslet-rw",
+       .version        = FIO_IOOPS_VERSION,
+       .init           = fio_syslet_init,
+};
+
+#endif /* FIO_HAVE_SYSLET */
+
+static void fio_init fio_syslet_register(void)
+{
+       register_ioengine(&ioengine);
+}
+
+static void fio_exit fio_syslet_unregister(void)
+{
+       unregister_ioengine(&ioengine);
+}
diff --git a/fio.h b/fio.h
index b420db5..ff59b46 100644 (file)
--- a/fio.h
+++ b/fio.h
@@ -18,6 +18,8 @@
 #include "arch.h"
 #include "os.h"
 
+#include "syslet.h"
+
 enum fio_ddir {
        DDIR_READ = 0,
        DDIR_WRITE,
@@ -61,10 +63,18 @@ struct io_piece {
        struct list_head list;
        struct fio_file *file;
        unsigned long long offset;
-       unsigned int len;
+       unsigned long len;
        enum fio_ddir ddir;
 };
 
+#ifdef FIO_HAVE_SYSLET
+struct syslet_req {
+       struct syslet_uatom atom;
+       unsigned long cmd;
+       long ret;
+};
+#endif
+
 /*
  * The io unit
  */
@@ -78,6 +88,10 @@ struct io_u {
 #endif
 #ifdef FIO_HAVE_SGIO
                struct sg_io_hdr hdr;
+#endif
+#ifdef FIO_HAVE_SYSLET
+               struct syslet_req rw_atom;
+               struct syslet_req seek_atom;
 #endif
        };
        struct timeval start_time;
@@ -87,7 +101,7 @@ struct io_u {
         * Allocated/set buffer and length
         */
        void *buf;
-       unsigned int buflen;
+       unsigned long buflen;
        unsigned long long offset;
 
        /*
@@ -95,7 +109,7 @@ struct io_u {
         * partial transfers / residual data counts
         */
        void *xfer_buf;
-       unsigned int xfer_buflen;
+       unsigned long xfer_buflen;
 
        unsigned int resid;
        unsigned int error;
@@ -182,7 +196,7 @@ struct fio_file {
         */
        union {
                unsigned long file_data;
-               int fd;
+               long fd;
        };
        char *file_name;
        void *mmap;
diff --git a/init.c b/init.c
index e09a1f9..f4125b1 100644 (file)
--- a/init.c
+++ b/init.c
@@ -81,7 +81,7 @@ static struct fio_option options[] = {
                .help   = "IO engine to use",
                .def    = "sync",
                .posval = { "sync", "libaio", "posixaio", "mmap", "splice",
-                               "sg", "null", "net", },
+                               "sg", "null", "net", "syslet-rw" },
        },
        {
                .name   = "iodepth",
diff --git a/log.c b/log.c
index a705e50..dd63c38 100644 (file)
--- a/log.c
+++ b/log.c
@@ -5,7 +5,7 @@
 
 void write_iolog_put(struct thread_data *td, struct io_u *io_u)
 {
-       fprintf(td->iolog_f, "%u,%llu,%u\n", io_u->ddir, io_u->offset, io_u->buflen);
+       fprintf(td->iolog_f, "%u,%llu,%lu\n", io_u->ddir, io_u->offset, io_u->buflen);
 }
 
 int read_iolog_get(struct thread_data *td, struct io_u *io_u)
index e456ebc..f094d80 100644 (file)
@@ -19,6 +19,7 @@
 #define FIO_HAVE_IOSCHED_SWITCH
 #define FIO_HAVE_ODIRECT
 #define FIO_HAVE_HUGETLB
+#define FIO_HAVE_SYSLET
 
 #define OS_MAP_ANON            (MAP_ANONYMOUS)
 
@@ -73,6 +74,34 @@ static inline int vmsplice(int fd, const struct iovec *iov,
 
 #define SPLICE_DEF_SIZE        (64*1024)
 
+/*
+ * syslet stuff
+ */
+static inline long async_register(void *uah, unsigned int len)
+{
+       return syscall(__NR_async_register, uah, len);
+}
+
+static inline void *async_exec(void *data)
+{
+       return (void *) syscall(__NR_async_exec, data);
+}
+
+static inline long async_wait(unsigned long min_events)
+{
+       return syscall(__NR_async_wait, min_events);
+}
+
+static inline long async_unregister(void *uah, unsigned int len)
+{
+       return syscall(__NR_async_unregister, uah, len);
+}
+
+static inline long umem_add(unsigned long *uptr, unsigned long inc)
+{
+       return syscall(__NR_umem_add, uptr, inc);
+}
+
 enum {
        IOPRIO_WHO_PROCESS = 1,
        IOPRIO_WHO_PGRP,
diff --git a/syslet.h b/syslet.h
new file mode 100644 (file)
index 0000000..85f0ebc
--- /dev/null
+++ b/syslet.h
@@ -0,0 +1,136 @@
+#ifndef _LINUX_SYSLET_H
+#define _LINUX_SYSLET_H
+/*
+ * The syslet subsystem - asynchronous syscall execution support.
+ *
+ * Started by Ingo Molnar:
+ *
+ *  Copyright (C) 2007 Red Hat, Inc., Ingo Molnar <mingo@redhat.com>
+ *
+ * User-space API/ABI definitions:
+ */
+
+/*
+ * This is the 'Syslet Atom' - the basic unit of execution
+ * within the syslet framework. A syslet always represents
+ * a single system-call plus its arguments, plus has conditions
+ * attached to it that allows the construction of larger
+ * programs from these atoms. User-space variables can be used
+ * (for example a loop index) via the special sys_umem*() syscalls.
+ *
+ * Arguments are implemented via pointers to arguments. This not
+ * only increases the flexibility of syslet atoms (multiple syslets
+ * can share the same variable for example), but is also an
+ * optimization: copy_uatom() will only fetch syscall parameters
+ * up until the point it meets the first NULL pointer. 50% of all
+ * syscalls have 2 or less parameters (and 90% of all syscalls have
+ * 4 or less parameters).
+ *
+ * [ Note: since the argument array is at the end of the atom, and the
+ *   kernel will not touch any argument beyond the final NULL one, atoms
+ *   might be packed more tightly. (the only special case exception to
+ *   this rule would be SKIP_TO_NEXT_ON_STOP atoms, where the kernel will
+ *   jump a full syslet_uatom number of bytes.) ]
+ */
+struct syslet_uatom {
+       unsigned long                           flags;
+       unsigned long                           nr;
+       long __user                             *ret_ptr;
+       struct syslet_uatom     __user          *next;
+       unsigned long           __user          *arg_ptr[6];
+       /*
+        * User-space can put anything in here, kernel will not
+        * touch it:
+        */
+       void __user                             *private;
+};
+
+/*
+ * Flags to modify/control syslet atom behavior:
+ */
+
+/*
+ * Immediately queue this syslet asynchronously - do not even
+ * attempt to execute it synchronously in the user context:
+ */
+#define SYSLET_ASYNC                           0x00000001
+
+/*
+ * Never queue this syslet asynchronously - even if synchronous
+ * execution causes a context-switching:
+ */
+#define SYSLET_SYNC                            0x00000002
+
+/*
+ * Do not queue the syslet in the completion ring when done.
+ *
+ * ( the default is that the final atom of a syslet is queued
+ *   in the completion ring. )
+ *
+ * Some syscalls generate implicit completion events of their
+ * own.
+ */
+#define SYSLET_NO_COMPLETE                     0x00000004
+
+/*
+ * Execution control: conditions upon the return code
+ * of the previous syslet atom. 'Stop' means syslet
+ * execution is stopped and the atom is put into the
+ * completion ring:
+ */
+#define SYSLET_STOP_ON_NONZERO                 0x00000008
+#define SYSLET_STOP_ON_ZERO                    0x00000010
+#define SYSLET_STOP_ON_NEGATIVE                        0x00000020
+#define SYSLET_STOP_ON_NON_POSITIVE            0x00000040
+
+#define SYSLET_STOP_MASK                               \
+       (       SYSLET_STOP_ON_NONZERO          |       \
+               SYSLET_STOP_ON_ZERO             |       \
+               SYSLET_STOP_ON_NEGATIVE         |       \
+               SYSLET_STOP_ON_NON_POSITIVE             )
+
+/*
+ * Special modifier to 'stop' handling: instead of stopping the
+ * execution of the syslet, the linearly next syslet is executed.
+ * (Normal execution flows along atom->next, and execution stops
+ *  if atom->next is NULL or a stop condition becomes true.)
+ *
+ * This is what allows true branches of execution within syslets.
+ */
+#define SYSLET_SKIP_TO_NEXT_ON_STOP            0x00000080
+
+/*
+ * This is the (per-user-context) descriptor of the async completion
+ * ring. This gets registered via sys_async_register().
+ */
+struct async_head_user {
+       /*
+        * Pointers to completed async syslets (i.e. syslets that
+        * generated a cachemiss and went async, returning -EASYNCSYSLET
+        * to the user context by sys_async_exec()) are queued here.
+        * Syslets that were executed synchronously are not queued here.
+        *
+        * Note: the final atom that generated the exit condition is
+        * queued here. Normally this would be the last atom of a syslet.
+        */
+       struct syslet_uatom __user              **completion_ring;
+       /*
+        * Ring size in bytes:
+        */
+       unsigned long                           ring_size_bytes;
+
+       /*
+        * Maximum number of asynchronous contexts the kernel creates.
+        *
+        * -1UL has a special meaning: the kernel manages the optimal
+        * size of the async pool.
+        *
+        * Note: this field should be valid for the lifetime of async
+        * processing, because future kernels detect changes to this
+        * field. (enabling user-space to control the size of the async
+        * pool in a low-overhead fashion)
+        */
+       unsigned long                           max_nr_threads;
+};
+
+#endif
index 692eb5b..4440d44 100644 (file)
--- a/verify.c
+++ b/verify.c
@@ -51,7 +51,7 @@ static int verify_io_u_crc32(struct verify_header *hdr, struct io_u *io_u)
        c = crc32(p, hdr->len - sizeof(*hdr));
 
        if (c != hdr->crc32) {
-               log_err("crc32: verify failed at %llu/%u\n", io_u->offset, io_u->buflen);
+               log_err("crc32: verify failed at %llu/%lu\n", io_u->offset, io_u->buflen);
                log_err("crc32: wanted %lx, got %lx\n", hdr->crc32, c);
                return 1;
        }
@@ -69,7 +69,7 @@ static int verify_io_u_md5(struct verify_header *hdr, struct io_u *io_u)
        md5_update(&md5_ctx, p, hdr->len - sizeof(*hdr));
 
        if (memcmp(hdr->md5_digest, md5_ctx.hash, sizeof(md5_ctx.hash))) {
-               log_err("md5: verify failed at %llu/%u\n", io_u->offset, io_u->buflen);
+               log_err("md5: verify failed at %llu/%lu\n", io_u->offset, io_u->buflen);
                hexdump(hdr->md5_digest, sizeof(hdr->md5_digest));
                hexdump(md5_ctx.hash, sizeof(md5_ctx.hash));
                return 1;