X-Git-Url: https://git.kernel.dk/?p=fio.git;a=blobdiff_plain;f=engines%2Flibhdfs.c;h=faad3f875ac17504eb1cf634f0fc8f8043426ab1;hp=658cd6aed3ed840d507e5bc42e2ee220623948b9;hb=d220c761f78bc04bf34355560a0b6b7b85fba0e8;hpb=7d4a8e7e2b16b900891b886295d1e8493f853b0c;ds=sidebyside diff --git a/engines/libhdfs.c b/engines/libhdfs.c index 658cd6ae..faad3f87 100644 --- a/engines/libhdfs.c +++ b/engines/libhdfs.c @@ -9,69 +9,123 @@ * * thus, random reads and writes can also be achieved with this logic. * - * NOTE: please set environment variables FIO_HDFS_BS and FIO_HDFS_FCOUNT - * to appropriate value to work this engine properly - * */ -#include -#include -#include -#include -#include -#include +#include +#include #include "../fio.h" +#include "../optgroup.h" -#include "hdfs.h" +#define CHUNCK_NAME_LENGTH_MAX 80 +#define CHUNCK_CREATION_BUFFER_SIZE 65536 struct hdfsio_data { - char host[256]; - int port; hdfsFS fs; hdfsFile fp; - unsigned long fsbs; - unsigned long fscount; - unsigned long curr_file_id; - unsigned int numjobs; - unsigned int fid_correction; + uint64_t curr_file_id; }; -static int fio_hdfsio_setup_fs_params(struct hdfsio_data *hd) -{ - /* make sure that hdfsConnect is invoked before executing this function */ - hdfsSetWorkingDirectory(hd->fs, "/.perftest"); - hd->fp = hdfsOpenFile(hd->fs, ".fcount", O_RDONLY, 0, 0, 0); - if (hd->fp) { - hdfsRead(hd->fs, hd->fp, &(hd->fscount), sizeof(hd->fscount)); - hdfsCloseFile(hd->fs, hd->fp); - } - hd->fp = hdfsOpenFile(hd->fs, ".fbs", O_RDONLY, 0, 0, 0); - if (hd->fp) { - hdfsRead(hd->fs, hd->fp, &(hd->fsbs), sizeof(hd->fsbs)); - hdfsCloseFile(hd->fs, hd->fp); - } +struct hdfsio_options { + void *pad; /* needed because offset can't be 0 for a option defined used offsetof */ + char *host; + char *directory; + unsigned int port; + unsigned int chunck_size; + unsigned int single_instance; + unsigned int use_direct; +}; - return 0; +static struct fio_option options[] = { + { + .name = "namenode", + .lname = "hfds namenode", + .type = FIO_OPT_STR_STORE, + .off1 = offsetof(struct hdfsio_options, host), + .def = "localhost", + .help = "Namenode of the HDFS cluster", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_HDFS, + }, + { + .name = "hostname", + .lname = "hfds namenode", + .type = FIO_OPT_STR_STORE, + .off1 = offsetof(struct hdfsio_options, host), + .def = "localhost", + .help = "Namenode of the HDFS cluster", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_HDFS, + }, + { + .name = "port", + .lname = "hdfs namenode port", + .type = FIO_OPT_INT, + .off1 = offsetof(struct hdfsio_options, port), + .def = "9000", + .minval = 1, + .maxval = 65535, + .help = "Port used by the HDFS cluster namenode", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_HDFS, + }, + { + .name = "hdfsdirectory", + .lname = "hfds directory", + .type = FIO_OPT_STR_STORE, + .off1 = offsetof(struct hdfsio_options, directory), + .def = "/", + .help = "The HDFS directory where fio will create chuncks", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_HDFS, + }, + { + .name = "chunck_size", + .type = FIO_OPT_INT, + .off1 = offsetof(struct hdfsio_options, chunck_size), + .def = "1048576", + .help = "Size of individual chunck", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_HDFS, + }, + { + .name = "single_instance", + .type = FIO_OPT_BOOL, + .off1 = offsetof(struct hdfsio_options, single_instance), + .def = "1", + .help = "Use a single instance", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_HDFS, + }, + { + .name = "hdfs_use_direct", + .type = FIO_OPT_BOOL, + .off1 = offsetof(struct hdfsio_options, use_direct), + .def = "0", + .help = "Use readDirect instead of hdfsRead", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_HDFS, + }, + { + .name = NULL, + }, +}; + + +static int get_chunck_name(char *dest, char *file_name, uint64_t chunk_id) { + return snprintf(dest, CHUNCK_NAME_LENGTH_MAX, "%s_%lu", file_name, chunk_id); } static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u) { - struct hdfsio_data *hd; - hdfsFileInfo *fi; + struct hdfsio_options *options = td->eo; + struct hdfsio_data *hd = td->io_ops->data; unsigned long f_id; - char fname[80]; - int open_flags = 0; - - hd = td->io_ops->data; - - if (hd->curr_file_id == -1) { - /* see comment in fio_hdfsio_setup() function */ - fio_hdfsio_setup_fs_params(hd); - } + char fname[CHUNCK_NAME_LENGTH_MAX]; + int open_flags; /* find out file id based on the offset generated by fio */ - f_id = (io_u->offset / hd->fsbs) + hd->fid_correction; + f_id = floor(io_u->offset / options-> chunck_size); if (f_id == hd->curr_file_id) { /* file is already open */ @@ -79,46 +133,76 @@ static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u) } if (hd->curr_file_id != -1) { - hdfsCloseFile(hd->fs, hd->fp); + if ( hdfsCloseFile(hd->fs, hd->fp) == -1) { + log_err("hdfs: unable to close file: %s\n", strerror(errno)); + return errno; + } + hd->curr_file_id = -1; } - if (io_u->ddir == DDIR_READ) { + if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_SYNC) { open_flags = O_RDONLY; } else if (io_u->ddir == DDIR_WRITE) { open_flags = O_WRONLY; } else { log_err("hdfs: Invalid I/O Operation\n"); + return 0; + } + + get_chunck_name(fname, io_u->file->file_name, f_id); + hd->fp = hdfsOpenFile(hd->fs, fname, open_flags, 0, 0, + options->chunck_size); + if(hd->fp == NULL) { + log_err("hdfs: unable to open file: %s: %d\n", fname, strerror(errno)); + return errno; } - hd->curr_file_id = f_id; - do { - sprintf(fname, ".f%lu", f_id); - fi = hdfsGetPathInfo(hd->fs, fname); - if (fi->mSize >= hd->fsbs || io_u->ddir == DDIR_WRITE) { - /* file has enough data to read OR file is opened in write mode */ - hd->fp = - hdfsOpenFile(hd->fs, fname, open_flags, 0, 0, - hd->fsbs); - if (hd->fp) { - break; - } - } - /* file is empty, so try next file for reading */ - f_id = (f_id + 1) % hd->fscount; - } while (1); return 0; } -static int fio_io_end(struct thread_data *td, struct io_u *io_u, int ret) +static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u) { + struct hdfsio_data *hd = td->io_ops->data; + struct hdfsio_options *options = td->eo; + int ret; + unsigned long offset; + + offset = io_u->offset % options->chunck_size; + + if( (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) && + hdfsTell(hd->fs, hd->fp) != offset && hdfsSeek(hd->fs, hd->fp, offset) != 0 ) { + log_err("hdfs: seek failed: %s, are you doing random write smaller than chunck size ?\n", strerror(errno)); + io_u->error = errno; + return FIO_Q_COMPLETED; + }; + + // do the IO + if (io_u->ddir == DDIR_READ) { + if (options->use_direct) { + ret = readDirect(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen); + } else { + ret = hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen); + } + } else if (io_u->ddir == DDIR_WRITE) { + ret = hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf, + io_u->xfer_buflen); + } else if (io_u->ddir == DDIR_SYNC) { + ret = hdfsFlush(hd->fs, hd->fp); + } else { + log_err("hdfs: Invalid I/O Operation: %d\n", io_u->ddir); + ret = EINVAL; + } + + // Check if the IO went fine, or is incomplete if (ret != (int)io_u->xfer_buflen) { if (ret >= 0) { io_u->resid = io_u->xfer_buflen - ret; io_u->error = 0; return FIO_Q_COMPLETED; - } else + } else { io_u->error = errno; + } } if (io_u->error) @@ -127,107 +211,200 @@ static int fio_io_end(struct thread_data *td, struct io_u *io_u, int ret) return FIO_Q_COMPLETED; } -static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u) +int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f) { - struct hdfsio_data *hd; - int ret = 0; - - hd = td->io_ops->data; - - if (io_u->ddir == DDIR_READ) { - ret = - hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen); - } else if (io_u->ddir == DDIR_WRITE) { - ret = - hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf, - io_u->xfer_buflen); - } else { - log_err("hdfs: Invalid I/O Operation\n"); + if (td->o.odirect) { + td->error = EINVAL; + return 0; } - return fio_io_end(td, io_u, ret); + return 0; } -int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f) +int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f) { - struct hdfsio_data *hd; - - hd = td->io_ops->data; - hd->fs = hdfsConnect(hd->host, hd->port); - hdfsSetWorkingDirectory(hd->fs, "/.perftest"); - hd->fid_correction = (getpid() % hd->numjobs); + struct hdfsio_data *hd = td->io_ops->data; + if (hd->curr_file_id != -1) { + if ( hdfsCloseFile(hd->fs, hd->fp) == -1) { + log_err("hdfs: unable to close file: %s\n", strerror(errno)); + return errno; + } + hd->curr_file_id = -1; + } return 0; } -int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f) +static int fio_hdfsio_init(struct thread_data *td) { - struct hdfsio_data *hd; - - hd = td->io_ops->data; - hdfsDisconnect(hd->fs); + struct hdfsio_options *options = td->eo; + struct hdfsio_data *hd = td->io_ops->data; + struct fio_file *f; + uint64_t j,k; + int i, failure = 0; + uint8_t buffer[CHUNCK_CREATION_BUFFER_SIZE]; + uint64_t bytes_left; + char fname[CHUNCK_NAME_LENGTH_MAX]; + hdfsFile fp; + hdfsFileInfo *fi; + tOffset fi_size; + + for_each_file(td, f, i) { + k = 0; + for(j=0; j < f->real_file_size; j += options->chunck_size) { + get_chunck_name(fname, f->file_name, k++); + fi = hdfsGetPathInfo(hd->fs, fname); + fi_size = fi ? fi->mSize : 0; + // fill exist and is big enough, nothing to do + if( fi && fi_size >= options->chunck_size) { + continue; + } + fp = hdfsOpenFile(hd->fs, fname, O_WRONLY, 0, 0, + options->chunck_size); + if(fp == NULL) { + failure = errno; + log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno)); + break; + } + bytes_left = options->chunck_size; + memset(buffer, 0, CHUNCK_CREATION_BUFFER_SIZE); + while( bytes_left > CHUNCK_CREATION_BUFFER_SIZE) { + if( hdfsWrite(hd->fs, fp, buffer, CHUNCK_CREATION_BUFFER_SIZE) + != CHUNCK_CREATION_BUFFER_SIZE) { + failure = errno; + log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno)); + break; + }; + bytes_left -= CHUNCK_CREATION_BUFFER_SIZE; + } + if(bytes_left > 0) { + if( hdfsWrite(hd->fs, fp, buffer, bytes_left) + != bytes_left) { + failure = errno; + break; + }; + } + if( hdfsCloseFile(hd->fs, fp) != 0) { + failure = errno; + log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno)); + break; + } + } + if(failure) { + break; + } + } + + if( !failure ) { + fio_file_set_size_known(f); + } - return 0; + return failure; } static int fio_hdfsio_setup(struct thread_data *td) { struct hdfsio_data *hd; struct fio_file *f; - static unsigned int numjobs = 1; /* atleast one job has to be there! */ - numjobs = (td->o.numjobs > numjobs) ? td->o.numjobs : numjobs; + int i; + uint64_t file_size, total_file_size; if (!td->io_ops->data) { - hd = malloc(sizeof(*hd));; - + hd = malloc(sizeof(*hd)); memset(hd, 0, sizeof(*hd)); - td->io_ops->data = hd; + + hd->curr_file_id = -1; - /* separate host and port from filename */ - *(strchr(td->o.filename, ',')) = ' '; - sscanf(td->o.filename, "%s%d", hd->host, &(hd->port)); - - /* read fbs and fcount and based on that set f->real_file_size */ - f = td->files[0]; -#if 0 - /* IMHO, this should be done here instead of fio_hdfsio_prep() - * but somehow calling it here doesn't seem to work, - * some problem with libhdfs that needs to be debugged */ - hd->fs = hdfsConnect(hd->host, hd->port); - fio_hdfsio_setup_fs_params(hd); - hdfsDisconnect(hd->fs); -#else - /* so, as an alternate, using environment variables */ - if (getenv("FIO_HDFS_FCOUNT") && getenv("FIO_HDFS_BS")) { - hd->fscount = atol(getenv("FIO_HDFS_FCOUNT")); - hd->fsbs = atol(getenv("FIO_HDFS_BS")); - } else { - log_err("FIO_HDFS_FCOUNT and/or FIO_HDFS_BS not set.\n"); - return 1; + td->io_ops->data = hd; + } + + total_file_size = 0; + file_size = 0; + + for_each_file(td, f, i) { + if(!td->o.file_size_low) { + file_size = floor(td->o.size / td->o.nr_files); + total_file_size += file_size; } -#endif - f->real_file_size = hd->fscount * hd->fsbs; - - td->o.nr_files = 1; - hd->curr_file_id = -1; - hd->numjobs = numjobs; - fio_file_set_size_known(f); + else if (td->o.file_size_low == td->o.file_size_high) + file_size = td->o.file_size_low; + else { + file_size = get_rand_file_size(td); + } + f->real_file_size = file_size; } + /* If the size doesn't divide nicely with the chunck size, + * make the last files bigger. + * Used only if filesize was not explicitely given + */ + if (!td->o.file_size_low && total_file_size < td->o.size) { + f->real_file_size += (td->o.size - total_file_size); + } + + return 0; +} +static int fio_hdfsio_io_u_init(struct thread_data *td, struct io_u *io_u) +{ + struct hdfsio_data *hd = td->io_ops->data; + struct hdfsio_options *options = td->eo; + int failure; + struct hdfsBuilder *bld; + + if (options->host == NULL || options->port == 0) { + log_err("hdfs: server not defined\n"); + return EINVAL; + } + + bld = hdfsNewBuilder(); + if (!bld) { + failure = errno; + log_err("hdfs: unable to allocate connect builder\n"); + return failure; + } + hdfsBuilderSetNameNode(bld, options->host); + hdfsBuilderSetNameNodePort(bld, options->port); + if(! options->single_instance) { + hdfsBuilderSetForceNewInstance(bld); + } + hd->fs = hdfsBuilderConnect(bld); + + /* hdfsSetWorkingDirectory succeed on non existend directory */ + if (hdfsExists(hd->fs, options->directory) < 0 || hdfsSetWorkingDirectory(hd->fs, options->directory) < 0) { + failure = errno; + log_err("hdfs: invalid working directory %s: %s\n", options->directory, strerror(errno)); + return failure; + } + return 0; } +static void fio_hdfsio_io_u_free(struct thread_data *td, struct io_u *io_u) +{ + struct hdfsio_data *hd = td->io_ops->data; + + if (hd->fs && hdfsDisconnect(hd->fs) < 0) { + log_err("hdfs: disconnect failed: %d\n", errno); + } +} + static struct ioengine_ops ioengine_hdfs = { .name = "libhdfs", .version = FIO_IOOPS_VERSION, + .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_NODISKUTIL, .setup = fio_hdfsio_setup, + .init = fio_hdfsio_init, .prep = fio_hdfsio_prep, .queue = fio_hdfsio_queue, .open_file = fio_hdfsio_open_file, .close_file = fio_hdfsio_close_file, - .flags = FIO_SYNCIO, + .io_u_init = fio_hdfsio_io_u_init, + .io_u_free = fio_hdfsio_io_u_free, + .option_struct_size = sizeof(struct hdfsio_options), + .options = options, }; + static void fio_init fio_hdfsio_register(void) { register_ioengine(&ioengine_hdfs);