Add support for HDFS IO engine
authorManish Mandlik <manishm@fb.com>
Wed, 13 Aug 2014 19:36:52 +0000 (13:36 -0600)
committerJens Axboe <axboe@fb.com>
Wed, 13 Aug 2014 19:36:52 +0000 (13:36 -0600)
Signed-off-by: Jens Axboe <axboe@fb.com>
HOWTO
Makefile
configure
engines/libhdfs.c [new file with mode: 0644]
fio.1
options.c

diff --git a/HOWTO b/HOWTO
index 87346ae5acee4cdf62497adbbc556a9f88289c8c..d7283535db0d8ddff7908422185fe5cdb9480b76 100644 (file)
--- a/HOWTO
+++ b/HOWTO
@@ -694,6 +694,8 @@ ioengine=str        Defines how the job issues io to the file. The following
                                having to go through FUSE. This ioengine
                                defines engine specific options.
 
+                       hdfs    Read and write through Hadoop (HDFS).
+
                        external Prefix to specify loading an external
                                IO engine object file. Append the engine
                                filename, eg ioengine=external:/tmp/foo.o
index 65e95be6c9f4a5a485075e09eccea7033c62356e..8d8626928fcc03df6022b0fe2657708f15f88181 100644 (file)
--- a/Makefile
+++ b/Makefile
@@ -38,6 +38,13 @@ SOURCE := gettime.c ioengines.c init.c stat.c log.c time.c filesetup.c \
                profiles/tiobench.c profiles/act.c io_u_queue.c filelock.c \
                lib/tp.c
 
+ifdef CONFIG_LIBHDFS
+  HDFSFLAGS= -I $(JAVA_HOME)/include -I $(JAVA_HOME)/include/linux -I $(FIO_LIBHDFS_INCLUDE)
+  HDFSLIB= $(JAVA_HOME)/jre/lib/amd64/server/libjvm.so $(FIO_LIBHDFS_LIB)/liblibhdfs.a
+  CFLAGS += $(HDFSFLAGS)
+  SOURCE += engines/libhdfs.c
+endif
+
 ifdef CONFIG_64BIT_LLP64
   CFLAGS += -DBITS_PER_LONG=32
 endif
@@ -268,7 +275,7 @@ t/ieee754: $(T_IEEE_OBJS)
        $(QUIET_LINK)$(CC) $(LDFLAGS) $(CFLAGS) -o $@ $(T_IEEE_OBJS) $(LIBS)
 
 fio: $(FIO_OBJS)
-       $(QUIET_LINK)$(CC) $(LDFLAGS) $(CFLAGS) -o $@ $(FIO_OBJS) $(LIBS)
+       $(QUIET_LINK)$(CC) $(LDFLAGS) $(CFLAGS) -o $@ $(FIO_OBJS) $(LIBS) $(HDFSLIB)
 
 gfio: $(GFIO_OBJS)
        $(QUIET_LINK)$(CC) $(LDFLAGS) -o gfio $(GFIO_OBJS) $(LIBS) $(GTK_LDFLAGS)
index 1494dd766b87e9e2d181f9fa2ef62e00a5001b7a..33d1327ebbba5b70a001e422bb5ad9b24d7c7b49 100755 (executable)
--- a/configure
+++ b/configure
@@ -134,6 +134,7 @@ cpu=""
 show_help="no"
 exit_val=0
 gfio="no"
+libhdfs="no"
 
 # parse options
 for opt do
@@ -160,6 +161,8 @@ for opt do
   ;;
   --disable-gfapi) disable_gfapi="yes"
   ;;
+  --enable-libhdfs) libhdfs="yes"
+  ;;
   --help)
     show_help="yes"
     ;;
@@ -178,6 +181,7 @@ if test "$show_help" = "yes" ; then
   echo "--esx                  Configure build options for esx"
   echo "--enable-gfio          Enable building of gtk gfio"
   echo "--disable-numa         Disable libnuma even if found"
+  echo "--enable-libhdfs       Enable hdfs support"
   exit $exit_val
 fi
 
@@ -1243,6 +1247,29 @@ if compile_prog "" "" "s390_z196_facilities"; then
   fi
 fi
 echo "s390_z196_facilities          $s390_z196_facilities"
+
+##########################################
+# Check if we have required environment variables configured for libhdfs
+if test "$libhdfs" = "yes" ; then
+  hdfs_conf_error=0
+  if test "$JAVA_HOME" = "" ; then
+    echo "configure: JAVA_HOME should be defined to jdk/jvm path"
+    hdfs_conf_error=1
+  fi
+  if test "$FIO_LIBHDFS_INCLUDE" = "" ; then
+    echo "configure: FIO_LIBHDFS_INCLUDE should be defined to libhdfs inlude path"
+    hdfs_conf_error=1
+  fi
+  if test "$FIO_LIBHDFS_LIB" = "" ; then
+    echo "configure: FIO_LIBHDFS_LIB should be defined to libhdfs library path"
+    hdfs_conf_error=1
+  fi
+  if test "$hdfs_conf_error" = "1" ; then
+    exit 1
+  fi
+fi
+echo "HDFS engine                   $libhdfs"
+
 #############################################################################
 
 if test "$wordsize" = "64" ; then
@@ -1384,6 +1411,9 @@ fi
 if test "$gf_fadvise" = "yes" ; then
   output_sym "CONFIG_GF_FADVISE"
 fi
+if test "$libhdfs" = "yes" ; then
+  output_sym "CONFIG_LIBHDFS"
+fi
 
 if test "$zlib" = "no" ; then
   echo "Consider installing zlib-dev (zlib-devel), some fio features depend on it."
diff --git a/engines/libhdfs.c b/engines/libhdfs.c
new file mode 100644 (file)
index 0000000..773d46b
--- /dev/null
@@ -0,0 +1,240 @@
+/*
+ * libhdfs engine
+ *
+ * this engine helps perform read/write operations on hdfs cluster using
+ * libhdfs. hdfs doesnot support modification of data once file is created.
+ *
+ * so to mimic that create many files of small size (e.g 256k), and this
+ * engine select a file based on the offset generated by fio.
+ *
+ * thus, random reads and writes can also be achieved with this logic.
+ *
+ * NOTE: please set environment variables FIO_HDFS_BS and FIO_HDFS_FCOUNT
+ * to appropriate value to work this engine properly
+ *
+ */
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <sys/uio.h>
+#include <errno.h>
+#include <assert.h>
+
+#include "../fio.h"
+
+#include "hdfs.h"
+
+struct hdfsio_data {
+       char host[256];
+       int port;
+       hdfsFS fs;
+       hdfsFile fp;
+       unsigned long fsbs;
+       unsigned long fscount;
+       unsigned long curr_file_id;
+       unsigned int numjobs;
+       unsigned int fid_correction;
+};
+
+static int fio_hdfsio_setup_fs_params(struct hdfsio_data *hd)
+{
+       /* make sure that hdfsConnect is invoked before executing this function */
+       hdfsSetWorkingDirectory(hd->fs, "/.perftest");
+       hd->fp = hdfsOpenFile(hd->fs, ".fcount", O_RDONLY, 0, 0, 0);
+       if (hd->fp) {
+               hdfsRead(hd->fs, hd->fp, &(hd->fscount), sizeof(hd->fscount));
+               hdfsCloseFile(hd->fs, hd->fp);
+       }
+       hd->fp = hdfsOpenFile(hd->fs, ".fbs", O_RDONLY, 0, 0, 0);
+       if (hd->fp) {
+               hdfsRead(hd->fs, hd->fp, &(hd->fsbs), sizeof(hd->fsbs));
+               hdfsCloseFile(hd->fs, hd->fp);
+       }
+
+       return 0;
+}
+
+static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u)
+{
+       struct hdfsio_data *hd;
+       hdfsFileInfo *fi;
+       unsigned long f_id;
+       char fname[80];
+       int open_flags = 0;
+
+       hd = td->io_ops->data;
+
+       if (hd->curr_file_id == -1) {
+               /* see comment in fio_hdfsio_setup() function */
+               fio_hdfsio_setup_fs_params(hd);
+       }
+
+       /* find out file id based on the offset generated by fio */
+       f_id = (io_u->offset / hd->fsbs) + hd->fid_correction;
+
+       if (f_id == hd->curr_file_id) {
+               /* file is already open */
+               return 0;
+       }
+
+       if (hd->curr_file_id != -1) {
+               hdfsCloseFile(hd->fs, hd->fp);
+       }
+
+       if (io_u->ddir == DDIR_READ) {
+               open_flags = O_RDONLY;
+       } else if (io_u->ddir == DDIR_WRITE) {
+               open_flags = O_WRONLY;
+       } else {
+               printf("Invalid I/O Operation\n");
+       }
+
+       hd->curr_file_id = f_id;
+       do {
+               sprintf(fname, ".f%lu", f_id);
+               fi = hdfsGetPathInfo(hd->fs, fname);
+               if (fi->mSize >= hd->fsbs || io_u->ddir == DDIR_WRITE) {
+                       /* file has enough data to read OR file is opened in write mode */
+                       hd->fp =
+                           hdfsOpenFile(hd->fs, fname, open_flags, 0, 0,
+                                        hd->fsbs);
+                       if (hd->fp) {
+                               break;
+                       }
+               }
+               /* file is empty, so try next file for reading */
+               f_id = (f_id + 1) % hd->fscount;
+       } while (1);
+
+       return 0;
+}
+
+static int fio_io_end(struct thread_data *td, struct io_u *io_u, int ret)
+{
+       if (ret != (int)io_u->xfer_buflen) {
+               if (ret >= 0) {
+                       io_u->resid = io_u->xfer_buflen - ret;
+                       io_u->error = 0;
+                       return FIO_Q_COMPLETED;
+               } else
+                       io_u->error = errno;
+       }
+
+       if (io_u->error)
+               td_verror(td, io_u->error, "xfer");
+
+       return FIO_Q_COMPLETED;
+}
+
+static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u)
+{
+       struct hdfsio_data *hd;
+       int ret = 0;
+
+       hd = td->io_ops->data;
+
+       if (io_u->ddir == DDIR_READ) {
+               ret =
+                   hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
+       } else if (io_u->ddir == DDIR_WRITE) {
+               ret =
+                   hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf,
+                             io_u->xfer_buflen);
+       } else {
+               printf("Invalid I/O Operation\n");
+       }
+
+       return fio_io_end(td, io_u, ret);
+}
+
+int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f)
+{
+       struct hdfsio_data *hd;
+
+       hd = td->io_ops->data;
+       hd->fs = hdfsConnect(hd->host, hd->port);
+       hdfsSetWorkingDirectory(hd->fs, "/.perftest");
+       hd->fid_correction = (getpid() % hd->numjobs);
+
+       return 0;
+}
+
+int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f)
+{
+       struct hdfsio_data *hd;
+
+       hd = td->io_ops->data;
+       hdfsDisconnect(hd->fs);
+
+       return 0;
+}
+
+static int fio_hdfsio_setup(struct thread_data *td)
+{
+       struct hdfsio_data *hd;
+       struct fio_file *f;
+       static unsigned int numjobs = 1;        /* atleast one job has to be there! */
+       numjobs = (td->o.numjobs > numjobs) ? td->o.numjobs : numjobs;
+
+       if (!td->io_ops->data) {
+               hd = malloc(sizeof(*hd));;
+
+               memset(hd, 0, sizeof(*hd));
+               td->io_ops->data = hd;
+
+               /* separate host and port from filename */
+               *(strchr(td->o.filename, ',')) = ' ';
+               sscanf(td->o.filename, "%s%d", hd->host, &(hd->port));
+
+               /* read fbs and fcount and based on that set f->real_file_size */
+               f = td->files[0];
+#if 0
+               /* IMHO, this should be done here instead of fio_hdfsio_prep()
+                * but somehow calling it here doesn't seem to work,
+                * some problem with libhdfs that needs to be debugged */
+               hd->fs = hdfsConnect(hd->host, hd->port);
+               fio_hdfsio_setup_fs_params(hd);
+               hdfsDisconnect(hd->fs);
+#else
+               /* so, as an alternate, using environment variables */
+               if (getenv("FIO_HDFS_FCOUNT") && getenv("FIO_HDFS_BS")) {
+                       hd->fscount = atol(getenv("FIO_HDFS_FCOUNT"));
+                       hd->fsbs = atol(getenv("FIO_HDFS_BS"));
+               } else {
+                       fprintf(stderr,
+                               "FIO_HDFS_FCOUNT and/or FIO_HDFS_BS not set.\n");
+                       return 1;
+               }
+#endif
+               f->real_file_size = hd->fscount * hd->fsbs;
+
+               td->o.nr_files = 1;
+               hd->curr_file_id = -1;
+               hd->numjobs = numjobs;
+               fio_file_set_size_known(f);
+       }
+
+       return 0;
+}
+
+static struct ioengine_ops ioengine_hdfs = {
+       .name = "libhdfs",
+       .version = FIO_IOOPS_VERSION,
+       .setup = fio_hdfsio_setup,
+       .prep = fio_hdfsio_prep,
+       .queue = fio_hdfsio_queue,
+       .open_file = fio_hdfsio_open_file,
+       .close_file = fio_hdfsio_close_file,
+       .flags = FIO_SYNCIO,
+};
+
+static void fio_init fio_hdfsio_register(void)
+{
+       register_ioengine(&ioengine_hdfs);
+}
+
+static void fio_exit fio_hdfsio_unregister(void)
+{
+       unregister_ioengine(&ioengine_hdfs);
+}
diff --git a/fio.1 b/fio.1
index 5291126401a34fdd836137cc9694dc7b5574255d..b5ff3ccbc4633dd0cf92ad42a59efe660d64e848 100644 (file)
--- a/fio.1
+++ b/fio.1
@@ -612,6 +612,9 @@ options.
 Using Glusterfs libgfapi async interface to direct access to Glusterfs volumes without
 having to go through FUSE. This ioengine defines engine specific
 options.
+.TP
+.B hdfs
+Read and write through Hadoop (HDFS)
 .RE
 .P
 .RE
index 3a3321f7b9d6a3d9b0b5bbf4b8da1d283182c3e3..484efc1a2ebe8dd438dceff5a2a1307c8eaf7c0b 100644 (file)
--- a/options.c
+++ b/options.c
@@ -1541,7 +1541,11 @@ struct fio_option fio_options[FIO_MAX_OPTS] = {
                            .help = "Glusterfs libgfapi(async) based engine"
                          },
 #endif
-
+#ifdef CONFIG_LIBHDFS
+                         { .ival = "hdfs",
+                           .help = "Hadoop Distributed Filesystem (HDFS) engine"
+                         },
+#endif
                          { .ival = "external",
                            .help = "Load external engine (append name)",
                          },