options: split out option grouping code
[fio.git] / engines / libhdfs.c
index 658cd6aed3ed840d507e5bc42e2ee220623948b9..faad3f875ac17504eb1cf634f0fc8f8043426ab1 100644 (file)
  *
  * thus, random reads and writes can also be achieved with this logic.
  *
- * NOTE: please set environment variables FIO_HDFS_BS and FIO_HDFS_FCOUNT
- * to appropriate value to work this engine properly
- *
  */
 
-#include <stdio.h>
-#include <stdlib.h>
-#include <unistd.h>
-#include <sys/uio.h>
-#include <errno.h>
-#include <assert.h>
+#include <math.h>
+#include <hdfs.h>
 
 #include "../fio.h"
+#include "../optgroup.h"
 
-#include "hdfs.h"
+#define CHUNCK_NAME_LENGTH_MAX 80
+#define CHUNCK_CREATION_BUFFER_SIZE 65536
 
 struct hdfsio_data {
-       char host[256];
-       int port;
        hdfsFS fs;
        hdfsFile fp;
-       unsigned long fsbs;
-       unsigned long fscount;
-       unsigned long curr_file_id;
-       unsigned int numjobs;
-       unsigned int fid_correction;
+       uint64_t curr_file_id;
 };
 
-static int fio_hdfsio_setup_fs_params(struct hdfsio_data *hd)
-{
-       /* make sure that hdfsConnect is invoked before executing this function */
-       hdfsSetWorkingDirectory(hd->fs, "/.perftest");
-       hd->fp = hdfsOpenFile(hd->fs, ".fcount", O_RDONLY, 0, 0, 0);
-       if (hd->fp) {
-               hdfsRead(hd->fs, hd->fp, &(hd->fscount), sizeof(hd->fscount));
-               hdfsCloseFile(hd->fs, hd->fp);
-       }
-       hd->fp = hdfsOpenFile(hd->fs, ".fbs", O_RDONLY, 0, 0, 0);
-       if (hd->fp) {
-               hdfsRead(hd->fs, hd->fp, &(hd->fsbs), sizeof(hd->fsbs));
-               hdfsCloseFile(hd->fs, hd->fp);
-       }
+struct hdfsio_options {
+       void *pad;                      /* needed because offset can't be 0 for a option defined used offsetof */
+       char *host;
+       char *directory;
+       unsigned int port;
+       unsigned int chunck_size;
+       unsigned int single_instance;
+       unsigned int use_direct;
+};
 
-       return 0;
+static struct fio_option options[] = {
+       {
+               .name   = "namenode",
+               .lname  = "hfds namenode",
+               .type   = FIO_OPT_STR_STORE,
+               .off1   = offsetof(struct hdfsio_options, host),
+               .def    = "localhost",
+               .help   = "Namenode of the HDFS cluster",
+               .category = FIO_OPT_C_ENGINE,
+               .group  = FIO_OPT_G_HDFS,
+       },
+       {
+               .name   = "hostname",
+               .lname  = "hfds namenode",
+               .type   = FIO_OPT_STR_STORE,
+               .off1   = offsetof(struct hdfsio_options, host),
+               .def    = "localhost",
+               .help   = "Namenode of the HDFS cluster",
+               .category = FIO_OPT_C_ENGINE,
+               .group  = FIO_OPT_G_HDFS,
+       },
+       {
+               .name   = "port",
+               .lname  = "hdfs namenode port",
+               .type   = FIO_OPT_INT,
+               .off1   = offsetof(struct hdfsio_options, port),
+               .def    = "9000",
+               .minval = 1,
+               .maxval = 65535,
+               .help   = "Port used by the HDFS cluster namenode",
+               .category = FIO_OPT_C_ENGINE,
+               .group  = FIO_OPT_G_HDFS,
+       },
+       {
+               .name   = "hdfsdirectory",
+               .lname  = "hfds directory",
+               .type   = FIO_OPT_STR_STORE,
+               .off1   = offsetof(struct hdfsio_options, directory),
+               .def    = "/",
+               .help   = "The HDFS directory where fio will create chuncks",
+               .category = FIO_OPT_C_ENGINE,
+               .group  = FIO_OPT_G_HDFS,
+       },
+       {
+               .name   = "chunck_size",
+               .type   = FIO_OPT_INT,
+               .off1   = offsetof(struct hdfsio_options, chunck_size),
+               .def    = "1048576",
+               .help   = "Size of individual chunck",
+               .category = FIO_OPT_C_ENGINE,
+               .group  = FIO_OPT_G_HDFS,
+       },
+       {
+               .name   = "single_instance",
+               .type   = FIO_OPT_BOOL,
+               .off1   = offsetof(struct hdfsio_options, single_instance),
+               .def    = "1",
+               .help   = "Use a single instance",
+               .category = FIO_OPT_C_ENGINE,
+               .group  = FIO_OPT_G_HDFS,
+       },
+       {
+               .name   = "hdfs_use_direct",
+               .type   = FIO_OPT_BOOL,
+               .off1   = offsetof(struct hdfsio_options, use_direct),
+               .def    = "0",
+               .help   = "Use readDirect instead of hdfsRead",
+               .category = FIO_OPT_C_ENGINE,
+               .group  = FIO_OPT_G_HDFS,
+       },
+       {
+               .name   = NULL,
+       },
+};
+
+
+static int get_chunck_name(char *dest, char *file_name, uint64_t chunk_id) {
+       return snprintf(dest, CHUNCK_NAME_LENGTH_MAX, "%s_%lu", file_name, chunk_id);
 }
 
 static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u)
 {
-       struct hdfsio_data *hd;
-       hdfsFileInfo *fi;
+       struct hdfsio_options *options = td->eo;
+       struct hdfsio_data *hd = td->io_ops->data;
        unsigned long f_id;
-       char fname[80];
-       int open_flags = 0;
-
-       hd = td->io_ops->data;
-
-       if (hd->curr_file_id == -1) {
-               /* see comment in fio_hdfsio_setup() function */
-               fio_hdfsio_setup_fs_params(hd);
-       }
+       char fname[CHUNCK_NAME_LENGTH_MAX];
+       int open_flags;
 
        /* find out file id based on the offset generated by fio */
-       f_id = (io_u->offset / hd->fsbs) + hd->fid_correction;
+       f_id = floor(io_u->offset / options-> chunck_size);
 
        if (f_id == hd->curr_file_id) {
                /* file is already open */
@@ -79,46 +133,76 @@ static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u)
        }
 
        if (hd->curr_file_id != -1) {
-               hdfsCloseFile(hd->fs, hd->fp);
+               if ( hdfsCloseFile(hd->fs, hd->fp) == -1) {
+                       log_err("hdfs: unable to close file: %s\n", strerror(errno));
+                       return errno;
+               }
+               hd->curr_file_id = -1;
        }
 
-       if (io_u->ddir == DDIR_READ) {
+       if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_SYNC) {
                open_flags = O_RDONLY;
        } else if (io_u->ddir == DDIR_WRITE) {
                open_flags = O_WRONLY;
        } else {
                log_err("hdfs: Invalid I/O Operation\n");
+               return 0;
+       }
+       
+       get_chunck_name(fname, io_u->file->file_name, f_id);
+       hd->fp = hdfsOpenFile(hd->fs, fname, open_flags, 0, 0,
+                             options->chunck_size);
+       if(hd->fp == NULL) {
+               log_err("hdfs: unable to open file: %s: %d\n", fname, strerror(errno));
+               return errno;
        }
-
        hd->curr_file_id = f_id;
-       do {
-               sprintf(fname, ".f%lu", f_id);
-               fi = hdfsGetPathInfo(hd->fs, fname);
-               if (fi->mSize >= hd->fsbs || io_u->ddir == DDIR_WRITE) {
-                       /* file has enough data to read OR file is opened in write mode */
-                       hd->fp =
-                           hdfsOpenFile(hd->fs, fname, open_flags, 0, 0,
-                                        hd->fsbs);
-                       if (hd->fp) {
-                               break;
-                       }
-               }
-               /* file is empty, so try next file for reading */
-               f_id = (f_id + 1) % hd->fscount;
-       } while (1);
 
        return 0;
 }
 
-static int fio_io_end(struct thread_data *td, struct io_u *io_u, int ret)
+static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u)
 {
+       struct hdfsio_data *hd = td->io_ops->data;
+       struct hdfsio_options *options = td->eo;
+       int ret;
+       unsigned long offset;
+       
+       offset = io_u->offset % options->chunck_size;
+       
+       if( (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) && 
+            hdfsTell(hd->fs, hd->fp) != offset && hdfsSeek(hd->fs, hd->fp, offset) != 0 ) {
+               log_err("hdfs: seek failed: %s, are you doing random write smaller than chunck size ?\n", strerror(errno));
+               io_u->error = errno;
+               return FIO_Q_COMPLETED;
+       };
+
+       // do the IO
+       if (io_u->ddir == DDIR_READ) {
+               if (options->use_direct) {
+                       ret = readDirect(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
+               } else {
+                       ret = hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
+               }
+       } else if (io_u->ddir == DDIR_WRITE) {
+               ret = hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf,
+                               io_u->xfer_buflen);
+       } else if (io_u->ddir == DDIR_SYNC) {
+               ret = hdfsFlush(hd->fs, hd->fp);
+       } else {
+               log_err("hdfs: Invalid I/O Operation: %d\n", io_u->ddir);
+               ret = EINVAL;
+       }
+
+       // Check if the IO went fine, or is incomplete
        if (ret != (int)io_u->xfer_buflen) {
                if (ret >= 0) {
                        io_u->resid = io_u->xfer_buflen - ret;
                        io_u->error = 0;
                        return FIO_Q_COMPLETED;
-               } else
+               } else {
                        io_u->error = errno;
+               }
        }
 
        if (io_u->error)
@@ -127,107 +211,200 @@ static int fio_io_end(struct thread_data *td, struct io_u *io_u, int ret)
        return FIO_Q_COMPLETED;
 }
 
-static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u)
+int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f)
 {
-       struct hdfsio_data *hd;
-       int ret = 0;
-
-       hd = td->io_ops->data;
-
-       if (io_u->ddir == DDIR_READ) {
-               ret =
-                   hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
-       } else if (io_u->ddir == DDIR_WRITE) {
-               ret =
-                   hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf,
-                             io_u->xfer_buflen);
-       } else {
-               log_err("hdfs: Invalid I/O Operation\n");
+       if (td->o.odirect) {
+               td->error = EINVAL;
+               return 0;
        }
 
-       return fio_io_end(td, io_u, ret);
+       return 0;
 }
 
-int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f)
+int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f)
 {
-       struct hdfsio_data *hd;
-
-       hd = td->io_ops->data;
-       hd->fs = hdfsConnect(hd->host, hd->port);
-       hdfsSetWorkingDirectory(hd->fs, "/.perftest");
-       hd->fid_correction = (getpid() % hd->numjobs);
+       struct hdfsio_data *hd = td->io_ops->data;
 
+       if (hd->curr_file_id != -1) {
+               if ( hdfsCloseFile(hd->fs, hd->fp) == -1) {
+                       log_err("hdfs: unable to close file: %s\n", strerror(errno));
+                       return errno;
+               }
+               hd->curr_file_id = -1;
+       }
        return 0;
 }
 
-int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f)
+static int fio_hdfsio_init(struct thread_data *td)
 {
-       struct hdfsio_data *hd;
-
-       hd = td->io_ops->data;
-       hdfsDisconnect(hd->fs);
+       struct hdfsio_options *options = td->eo;
+       struct hdfsio_data *hd = td->io_ops->data;
+       struct fio_file *f;
+       uint64_t j,k;
+       int i, failure = 0;
+       uint8_t buffer[CHUNCK_CREATION_BUFFER_SIZE];
+       uint64_t bytes_left;    
+       char fname[CHUNCK_NAME_LENGTH_MAX];     
+       hdfsFile fp;
+       hdfsFileInfo *fi;
+       tOffset fi_size;
+
+       for_each_file(td, f, i) {
+               k = 0;
+               for(j=0; j < f->real_file_size; j += options->chunck_size) {
+                       get_chunck_name(fname, f->file_name, k++);
+                       fi = hdfsGetPathInfo(hd->fs, fname);
+                       fi_size = fi ? fi->mSize : 0;
+                       // fill exist and is big enough, nothing to do
+                       if( fi && fi_size >= options->chunck_size) {
+                               continue;
+                       }
+                       fp = hdfsOpenFile(hd->fs, fname, O_WRONLY, 0, 0,
+                                         options->chunck_size);
+                       if(fp == NULL) {
+                               failure = errno;
+                               log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
+                               break;
+                       }
+                       bytes_left = options->chunck_size;
+                       memset(buffer, 0, CHUNCK_CREATION_BUFFER_SIZE);
+                       while( bytes_left > CHUNCK_CREATION_BUFFER_SIZE) {
+                               if( hdfsWrite(hd->fs, fp, buffer, CHUNCK_CREATION_BUFFER_SIZE)
+                                   != CHUNCK_CREATION_BUFFER_SIZE) {
+                                       failure = errno;
+                                       log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
+                                       break;
+                               };
+                               bytes_left -= CHUNCK_CREATION_BUFFER_SIZE;
+                       }
+                       if(bytes_left > 0) {
+                               if( hdfsWrite(hd->fs, fp, buffer, bytes_left)
+                                   != bytes_left) {
+                                       failure = errno;
+                                       break;
+                               };
+                       }
+                       if( hdfsCloseFile(hd->fs, fp) != 0) {
+                               failure = errno;
+                               log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
+                               break;
+                       }
+               }
+               if(failure) {
+                       break;
+               }
+       }
+       
+       if( !failure ) {
+               fio_file_set_size_known(f);
+       }
 
-       return 0;
+       return failure;
 }
 
 static int fio_hdfsio_setup(struct thread_data *td)
 {
        struct hdfsio_data *hd;
        struct fio_file *f;
-       static unsigned int numjobs = 1;        /* atleast one job has to be there! */
-       numjobs = (td->o.numjobs > numjobs) ? td->o.numjobs : numjobs;
+       int i;
+       uint64_t file_size, total_file_size;
 
        if (!td->io_ops->data) {
-               hd = malloc(sizeof(*hd));;
-
+               hd = malloc(sizeof(*hd));
                memset(hd, 0, sizeof(*hd));
-               td->io_ops->data = hd;
+               
+               hd->curr_file_id = -1;
 
-               /* separate host and port from filename */
-               *(strchr(td->o.filename, ',')) = ' ';
-               sscanf(td->o.filename, "%s%d", hd->host, &(hd->port));
-
-               /* read fbs and fcount and based on that set f->real_file_size */
-               f = td->files[0];
-#if 0
-               /* IMHO, this should be done here instead of fio_hdfsio_prep()
-                * but somehow calling it here doesn't seem to work,
-                * some problem with libhdfs that needs to be debugged */
-               hd->fs = hdfsConnect(hd->host, hd->port);
-               fio_hdfsio_setup_fs_params(hd);
-               hdfsDisconnect(hd->fs);
-#else
-               /* so, as an alternate, using environment variables */
-               if (getenv("FIO_HDFS_FCOUNT") && getenv("FIO_HDFS_BS")) {
-                       hd->fscount = atol(getenv("FIO_HDFS_FCOUNT"));
-                       hd->fsbs = atol(getenv("FIO_HDFS_BS"));
-               } else {
-                       log_err("FIO_HDFS_FCOUNT and/or FIO_HDFS_BS not set.\n");
-                       return 1;
+               td->io_ops->data = hd;
+       }
+       
+       total_file_size = 0;
+       file_size = 0;
+
+       for_each_file(td, f, i) {
+               if(!td->o.file_size_low) {
+                       file_size = floor(td->o.size / td->o.nr_files);
+                       total_file_size += file_size;
                }
-#endif
-               f->real_file_size = hd->fscount * hd->fsbs;
-
-               td->o.nr_files = 1;
-               hd->curr_file_id = -1;
-               hd->numjobs = numjobs;
-               fio_file_set_size_known(f);
+               else if (td->o.file_size_low == td->o.file_size_high)
+                       file_size = td->o.file_size_low;
+               else {
+                       file_size = get_rand_file_size(td);
+               }
+               f->real_file_size = file_size;
        }
+       /* If the size doesn't divide nicely with the chunck size,
+        * make the last files bigger.
+        * Used only if filesize was not explicitely given
+        */
+       if (!td->o.file_size_low && total_file_size < td->o.size) {
+               f->real_file_size += (td->o.size - total_file_size);
+       }
+
+       return 0;
+}
 
+static int fio_hdfsio_io_u_init(struct thread_data *td, struct io_u *io_u)
+{
+       struct hdfsio_data *hd = td->io_ops->data;
+       struct hdfsio_options *options = td->eo;
+       int failure;
+       struct hdfsBuilder *bld;
+
+       if (options->host == NULL || options->port == 0) {
+               log_err("hdfs: server not defined\n");
+               return EINVAL;
+       }
+       
+       bld = hdfsNewBuilder();
+       if (!bld) {
+               failure = errno;
+               log_err("hdfs: unable to allocate connect builder\n");
+               return failure;
+       }
+       hdfsBuilderSetNameNode(bld, options->host);
+       hdfsBuilderSetNameNodePort(bld, options->port);
+       if(! options->single_instance) {
+               hdfsBuilderSetForceNewInstance(bld);
+       }
+       hd->fs = hdfsBuilderConnect(bld);
+       
+       /* hdfsSetWorkingDirectory succeed on non existend directory */
+       if (hdfsExists(hd->fs, options->directory) < 0 || hdfsSetWorkingDirectory(hd->fs, options->directory) < 0) {
+               failure = errno;
+               log_err("hdfs: invalid working directory %s: %s\n", options->directory, strerror(errno));
+               return failure;
+       }
+       
        return 0;
 }
 
+static void fio_hdfsio_io_u_free(struct thread_data *td, struct io_u *io_u)
+{
+       struct hdfsio_data *hd = td->io_ops->data;
+
+       if (hd->fs && hdfsDisconnect(hd->fs) < 0) {
+               log_err("hdfs: disconnect failed: %d\n", errno);
+       }
+}
+
 static struct ioengine_ops ioengine_hdfs = {
        .name = "libhdfs",
        .version = FIO_IOOPS_VERSION,
+       .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_NODISKUTIL,
        .setup = fio_hdfsio_setup,
+       .init = fio_hdfsio_init,
        .prep = fio_hdfsio_prep,
        .queue = fio_hdfsio_queue,
        .open_file = fio_hdfsio_open_file,
        .close_file = fio_hdfsio_close_file,
-       .flags = FIO_SYNCIO,
+       .io_u_init = fio_hdfsio_io_u_init,
+       .io_u_free = fio_hdfsio_io_u_free,
+       .option_struct_size     = sizeof(struct hdfsio_options),
+       .options                = options,
 };
 
+
 static void fio_init fio_hdfsio_register(void)
 {
        register_ioengine(&ioengine_hdfs);