| 1 | /* |
| 2 | * libhdfs engine |
| 3 | * |
| 4 | * this engine helps perform read/write operations on hdfs cluster using |
| 5 | * libhdfs. hdfs doesnot support modification of data once file is created. |
| 6 | * |
| 7 | * so to mimic that create many files of small size (e.g 256k), and this |
| 8 | * engine select a file based on the offset generated by fio. |
| 9 | * |
| 10 | * thus, random reads and writes can also be achieved with this logic. |
| 11 | * |
| 12 | * NOTE: please set environment variables FIO_HDFS_BS and FIO_HDFS_FCOUNT |
| 13 | * to appropriate value to work this engine properly |
| 14 | * |
| 15 | */ |
| 16 | |
| 17 | #include <stdio.h> |
| 18 | #include <stdlib.h> |
| 19 | #include <unistd.h> |
| 20 | #include <sys/uio.h> |
| 21 | #include <errno.h> |
| 22 | #include <assert.h> |
| 23 | |
| 24 | #include "../fio.h" |
| 25 | |
| 26 | #include "hdfs.h" |
| 27 | |
| 28 | struct hdfsio_data { |
| 29 | char host[256]; |
| 30 | int port; |
| 31 | hdfsFS fs; |
| 32 | hdfsFile fp; |
| 33 | unsigned long fsbs; |
| 34 | unsigned long fscount; |
| 35 | unsigned long curr_file_id; |
| 36 | unsigned int numjobs; |
| 37 | unsigned int fid_correction; |
| 38 | }; |
| 39 | |
| 40 | static int fio_hdfsio_setup_fs_params(struct hdfsio_data *hd) |
| 41 | { |
| 42 | /* make sure that hdfsConnect is invoked before executing this function */ |
| 43 | hdfsSetWorkingDirectory(hd->fs, "/.perftest"); |
| 44 | hd->fp = hdfsOpenFile(hd->fs, ".fcount", O_RDONLY, 0, 0, 0); |
| 45 | if (hd->fp) { |
| 46 | hdfsRead(hd->fs, hd->fp, &(hd->fscount), sizeof(hd->fscount)); |
| 47 | hdfsCloseFile(hd->fs, hd->fp); |
| 48 | } |
| 49 | hd->fp = hdfsOpenFile(hd->fs, ".fbs", O_RDONLY, 0, 0, 0); |
| 50 | if (hd->fp) { |
| 51 | hdfsRead(hd->fs, hd->fp, &(hd->fsbs), sizeof(hd->fsbs)); |
| 52 | hdfsCloseFile(hd->fs, hd->fp); |
| 53 | } |
| 54 | |
| 55 | return 0; |
| 56 | } |
| 57 | |
| 58 | static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u) |
| 59 | { |
| 60 | struct hdfsio_data *hd; |
| 61 | hdfsFileInfo *fi; |
| 62 | unsigned long f_id; |
| 63 | char fname[80]; |
| 64 | int open_flags = 0; |
| 65 | |
| 66 | hd = td->io_ops->data; |
| 67 | |
| 68 | if (hd->curr_file_id == -1) { |
| 69 | /* see comment in fio_hdfsio_setup() function */ |
| 70 | fio_hdfsio_setup_fs_params(hd); |
| 71 | } |
| 72 | |
| 73 | /* find out file id based on the offset generated by fio */ |
| 74 | f_id = (io_u->offset / hd->fsbs) + hd->fid_correction; |
| 75 | |
| 76 | if (f_id == hd->curr_file_id) { |
| 77 | /* file is already open */ |
| 78 | return 0; |
| 79 | } |
| 80 | |
| 81 | if (hd->curr_file_id != -1) { |
| 82 | hdfsCloseFile(hd->fs, hd->fp); |
| 83 | } |
| 84 | |
| 85 | if (io_u->ddir == DDIR_READ) { |
| 86 | open_flags = O_RDONLY; |
| 87 | } else if (io_u->ddir == DDIR_WRITE) { |
| 88 | open_flags = O_WRONLY; |
| 89 | } else { |
| 90 | printf("Invalid I/O Operation\n"); |
| 91 | } |
| 92 | |
| 93 | hd->curr_file_id = f_id; |
| 94 | do { |
| 95 | sprintf(fname, ".f%lu", f_id); |
| 96 | fi = hdfsGetPathInfo(hd->fs, fname); |
| 97 | if (fi->mSize >= hd->fsbs || io_u->ddir == DDIR_WRITE) { |
| 98 | /* file has enough data to read OR file is opened in write mode */ |
| 99 | hd->fp = |
| 100 | hdfsOpenFile(hd->fs, fname, open_flags, 0, 0, |
| 101 | hd->fsbs); |
| 102 | if (hd->fp) { |
| 103 | break; |
| 104 | } |
| 105 | } |
| 106 | /* file is empty, so try next file for reading */ |
| 107 | f_id = (f_id + 1) % hd->fscount; |
| 108 | } while (1); |
| 109 | |
| 110 | return 0; |
| 111 | } |
| 112 | |
| 113 | static int fio_io_end(struct thread_data *td, struct io_u *io_u, int ret) |
| 114 | { |
| 115 | if (ret != (int)io_u->xfer_buflen) { |
| 116 | if (ret >= 0) { |
| 117 | io_u->resid = io_u->xfer_buflen - ret; |
| 118 | io_u->error = 0; |
| 119 | return FIO_Q_COMPLETED; |
| 120 | } else |
| 121 | io_u->error = errno; |
| 122 | } |
| 123 | |
| 124 | if (io_u->error) |
| 125 | td_verror(td, io_u->error, "xfer"); |
| 126 | |
| 127 | return FIO_Q_COMPLETED; |
| 128 | } |
| 129 | |
| 130 | static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u) |
| 131 | { |
| 132 | struct hdfsio_data *hd; |
| 133 | int ret = 0; |
| 134 | |
| 135 | hd = td->io_ops->data; |
| 136 | |
| 137 | if (io_u->ddir == DDIR_READ) { |
| 138 | ret = |
| 139 | hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen); |
| 140 | } else if (io_u->ddir == DDIR_WRITE) { |
| 141 | ret = |
| 142 | hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf, |
| 143 | io_u->xfer_buflen); |
| 144 | } else { |
| 145 | printf("Invalid I/O Operation\n"); |
| 146 | } |
| 147 | |
| 148 | return fio_io_end(td, io_u, ret); |
| 149 | } |
| 150 | |
| 151 | int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f) |
| 152 | { |
| 153 | struct hdfsio_data *hd; |
| 154 | |
| 155 | hd = td->io_ops->data; |
| 156 | hd->fs = hdfsConnect(hd->host, hd->port); |
| 157 | hdfsSetWorkingDirectory(hd->fs, "/.perftest"); |
| 158 | hd->fid_correction = (getpid() % hd->numjobs); |
| 159 | |
| 160 | return 0; |
| 161 | } |
| 162 | |
| 163 | int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f) |
| 164 | { |
| 165 | struct hdfsio_data *hd; |
| 166 | |
| 167 | hd = td->io_ops->data; |
| 168 | hdfsDisconnect(hd->fs); |
| 169 | |
| 170 | return 0; |
| 171 | } |
| 172 | |
| 173 | static int fio_hdfsio_setup(struct thread_data *td) |
| 174 | { |
| 175 | struct hdfsio_data *hd; |
| 176 | struct fio_file *f; |
| 177 | static unsigned int numjobs = 1; /* atleast one job has to be there! */ |
| 178 | numjobs = (td->o.numjobs > numjobs) ? td->o.numjobs : numjobs; |
| 179 | |
| 180 | if (!td->io_ops->data) { |
| 181 | hd = malloc(sizeof(*hd));; |
| 182 | |
| 183 | memset(hd, 0, sizeof(*hd)); |
| 184 | td->io_ops->data = hd; |
| 185 | |
| 186 | /* separate host and port from filename */ |
| 187 | *(strchr(td->o.filename, ',')) = ' '; |
| 188 | sscanf(td->o.filename, "%s%d", hd->host, &(hd->port)); |
| 189 | |
| 190 | /* read fbs and fcount and based on that set f->real_file_size */ |
| 191 | f = td->files[0]; |
| 192 | #if 0 |
| 193 | /* IMHO, this should be done here instead of fio_hdfsio_prep() |
| 194 | * but somehow calling it here doesn't seem to work, |
| 195 | * some problem with libhdfs that needs to be debugged */ |
| 196 | hd->fs = hdfsConnect(hd->host, hd->port); |
| 197 | fio_hdfsio_setup_fs_params(hd); |
| 198 | hdfsDisconnect(hd->fs); |
| 199 | #else |
| 200 | /* so, as an alternate, using environment variables */ |
| 201 | if (getenv("FIO_HDFS_FCOUNT") && getenv("FIO_HDFS_BS")) { |
| 202 | hd->fscount = atol(getenv("FIO_HDFS_FCOUNT")); |
| 203 | hd->fsbs = atol(getenv("FIO_HDFS_BS")); |
| 204 | } else { |
| 205 | fprintf(stderr, |
| 206 | "FIO_HDFS_FCOUNT and/or FIO_HDFS_BS not set.\n"); |
| 207 | return 1; |
| 208 | } |
| 209 | #endif |
| 210 | f->real_file_size = hd->fscount * hd->fsbs; |
| 211 | |
| 212 | td->o.nr_files = 1; |
| 213 | hd->curr_file_id = -1; |
| 214 | hd->numjobs = numjobs; |
| 215 | fio_file_set_size_known(f); |
| 216 | } |
| 217 | |
| 218 | return 0; |
| 219 | } |
| 220 | |
| 221 | static struct ioengine_ops ioengine_hdfs = { |
| 222 | .name = "libhdfs", |
| 223 | .version = FIO_IOOPS_VERSION, |
| 224 | .setup = fio_hdfsio_setup, |
| 225 | .prep = fio_hdfsio_prep, |
| 226 | .queue = fio_hdfsio_queue, |
| 227 | .open_file = fio_hdfsio_open_file, |
| 228 | .close_file = fio_hdfsio_close_file, |
| 229 | .flags = FIO_SYNCIO, |
| 230 | }; |
| 231 | |
| 232 | static void fio_init fio_hdfsio_register(void) |
| 233 | { |
| 234 | register_ioengine(&ioengine_hdfs); |
| 235 | } |
| 236 | |
| 237 | static void fio_exit fio_hdfsio_unregister(void) |
| 238 | { |
| 239 | unregister_ioengine(&ioengine_hdfs); |
| 240 | } |