4 * this engine helps perform read/write operations on hdfs cluster using
5 * libhdfs. hdfs does not support modification of data once file is created.
7 * so to mimic that create many files of small size (e.g 256k), and this
8 * engine select a file based on the offset generated by fio.
10 * thus, random reads and writes can also be achieved with this logic.
18 #include "../optgroup.h"
20 #define CHUNCK_NAME_LENGTH_MAX 80
21 #define CHUNCK_CREATION_BUFFER_SIZE 65536
26 uint64_t curr_file_id;
29 struct hdfsio_options {
30 void *pad; /* needed because offset can't be 0 for a option defined used offsetof */
34 unsigned int chunck_size;
35 unsigned int single_instance;
36 unsigned int use_direct;
39 static struct fio_option options[] = {
42 .lname = "hfds namenode",
43 .type = FIO_OPT_STR_STORE,
44 .off1 = offsetof(struct hdfsio_options, host),
46 .help = "Namenode of the HDFS cluster",
47 .category = FIO_OPT_C_ENGINE,
48 .group = FIO_OPT_G_HDFS,
52 .lname = "hfds namenode",
53 .type = FIO_OPT_STR_STORE,
54 .off1 = offsetof(struct hdfsio_options, host),
56 .help = "Namenode of the HDFS cluster",
57 .category = FIO_OPT_C_ENGINE,
58 .group = FIO_OPT_G_HDFS,
62 .lname = "hdfs namenode port",
64 .off1 = offsetof(struct hdfsio_options, port),
68 .help = "Port used by the HDFS cluster namenode",
69 .category = FIO_OPT_C_ENGINE,
70 .group = FIO_OPT_G_HDFS,
73 .name = "hdfsdirectory",
74 .lname = "hfds directory",
75 .type = FIO_OPT_STR_STORE,
76 .off1 = offsetof(struct hdfsio_options, directory),
78 .help = "The HDFS directory where fio will create chunks",
79 .category = FIO_OPT_C_ENGINE,
80 .group = FIO_OPT_G_HDFS,
84 .alias = "chunck_size",
85 .lname = "Chunk size",
87 .off1 = offsetof(struct hdfsio_options, chunck_size),
89 .help = "Size of individual chunk",
90 .category = FIO_OPT_C_ENGINE,
91 .group = FIO_OPT_G_HDFS,
94 .name = "single_instance",
95 .lname = "Single Instance",
97 .off1 = offsetof(struct hdfsio_options, single_instance),
99 .help = "Use a single instance",
100 .category = FIO_OPT_C_ENGINE,
101 .group = FIO_OPT_G_HDFS,
104 .name = "hdfs_use_direct",
105 .lname = "HDFS Use Direct",
106 .type = FIO_OPT_BOOL,
107 .off1 = offsetof(struct hdfsio_options, use_direct),
109 .help = "Use readDirect instead of hdfsRead",
110 .category = FIO_OPT_C_ENGINE,
111 .group = FIO_OPT_G_HDFS,
119 static int get_chunck_name(char *dest, char *file_name, uint64_t chunk_id) {
120 return snprintf(dest, CHUNCK_NAME_LENGTH_MAX, "%s_%lu", file_name, chunk_id);
123 static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u)
125 struct hdfsio_options *options = td->eo;
126 struct hdfsio_data *hd = td->io_ops_data;
128 char fname[CHUNCK_NAME_LENGTH_MAX];
131 /* find out file id based on the offset generated by fio */
132 f_id = floor(io_u->offset / options-> chunck_size);
134 if (f_id == hd->curr_file_id) {
135 /* file is already open */
139 if (hd->curr_file_id != -1) {
140 if ( hdfsCloseFile(hd->fs, hd->fp) == -1) {
141 log_err("hdfs: unable to close file: %s\n", strerror(errno));
144 hd->curr_file_id = -1;
147 if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_SYNC) {
148 open_flags = O_RDONLY;
149 } else if (io_u->ddir == DDIR_WRITE) {
150 open_flags = O_WRONLY;
152 log_err("hdfs: Invalid I/O Operation\n");
156 get_chunck_name(fname, io_u->file->file_name, f_id);
157 hd->fp = hdfsOpenFile(hd->fs, fname, open_flags, 0, 0,
158 options->chunck_size);
160 log_err("hdfs: unable to open file: %s: %d\n", fname, strerror(errno));
163 hd->curr_file_id = f_id;
168 static enum fio_q_status fio_hdfsio_queue(struct thread_data *td,
171 struct hdfsio_data *hd = td->io_ops_data;
172 struct hdfsio_options *options = td->eo;
174 unsigned long offset;
176 offset = io_u->offset % options->chunck_size;
178 if( (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) &&
179 hdfsTell(hd->fs, hd->fp) != offset && hdfsSeek(hd->fs, hd->fp, offset) != 0 ) {
180 log_err("hdfs: seek failed: %s, are you doing random write smaller than chunk size ?\n", strerror(errno));
182 return FIO_Q_COMPLETED;
186 if (io_u->ddir == DDIR_READ) {
187 if (options->use_direct) {
188 ret = readDirect(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
190 ret = hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
192 } else if (io_u->ddir == DDIR_WRITE) {
193 ret = hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf,
195 } else if (io_u->ddir == DDIR_SYNC) {
196 ret = hdfsFlush(hd->fs, hd->fp);
198 log_err("hdfs: Invalid I/O Operation: %d\n", io_u->ddir);
202 // Check if the IO went fine, or is incomplete
203 if (ret != (int)io_u->xfer_buflen) {
205 io_u->resid = io_u->xfer_buflen - ret;
207 return FIO_Q_COMPLETED;
214 td_verror(td, io_u->error, "xfer");
216 return FIO_Q_COMPLETED;
219 int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f)
229 int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f)
231 struct hdfsio_data *hd = td->io_ops_data;
233 if (hd->curr_file_id != -1) {
234 if ( hdfsCloseFile(hd->fs, hd->fp) == -1) {
235 log_err("hdfs: unable to close file: %s\n", strerror(errno));
238 hd->curr_file_id = -1;
243 static int fio_hdfsio_init(struct thread_data *td)
245 struct hdfsio_options *options = td->eo;
246 struct hdfsio_data *hd = td->io_ops_data;
250 uint8_t buffer[CHUNCK_CREATION_BUFFER_SIZE];
252 char fname[CHUNCK_NAME_LENGTH_MAX];
257 for_each_file(td, f, i) {
259 for(j=0; j < f->real_file_size; j += options->chunck_size) {
260 get_chunck_name(fname, f->file_name, k++);
261 fi = hdfsGetPathInfo(hd->fs, fname);
262 fi_size = fi ? fi->mSize : 0;
263 // fill exist and is big enough, nothing to do
264 if( fi && fi_size >= options->chunck_size) {
267 fp = hdfsOpenFile(hd->fs, fname, O_WRONLY, 0, 0,
268 options->chunck_size);
271 log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
274 bytes_left = options->chunck_size;
275 memset(buffer, 0, CHUNCK_CREATION_BUFFER_SIZE);
276 while( bytes_left > CHUNCK_CREATION_BUFFER_SIZE) {
277 if( hdfsWrite(hd->fs, fp, buffer, CHUNCK_CREATION_BUFFER_SIZE)
278 != CHUNCK_CREATION_BUFFER_SIZE) {
280 log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
283 bytes_left -= CHUNCK_CREATION_BUFFER_SIZE;
286 if( hdfsWrite(hd->fs, fp, buffer, bytes_left)
292 if( hdfsCloseFile(hd->fs, fp) != 0) {
294 log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
304 fio_file_set_size_known(f);
310 static int fio_hdfsio_setup(struct thread_data *td)
312 struct hdfsio_data *hd;
315 uint64_t file_size, total_file_size;
317 if (!td->io_ops_data) {
318 hd = malloc(sizeof(*hd));
319 memset(hd, 0, sizeof(*hd));
321 hd->curr_file_id = -1;
323 td->io_ops_data = hd;
329 for_each_file(td, f, i) {
330 if(!td->o.file_size_low) {
331 file_size = floor(td->o.size / td->o.nr_files);
332 total_file_size += file_size;
334 else if (td->o.file_size_low == td->o.file_size_high)
335 file_size = td->o.file_size_low;
337 file_size = get_rand_file_size(td);
339 f->real_file_size = file_size;
341 /* If the size doesn't divide nicely with the chunk size,
342 * make the last files bigger.
343 * Used only if filesize was not explicitly given
345 if (!td->o.file_size_low && total_file_size < td->o.size) {
346 f->real_file_size += (td->o.size - total_file_size);
352 static int fio_hdfsio_io_u_init(struct thread_data *td, struct io_u *io_u)
354 struct hdfsio_data *hd = td->io_ops_data;
355 struct hdfsio_options *options = td->eo;
357 struct hdfsBuilder *bld;
359 if (options->host == NULL || options->port == 0) {
360 log_err("hdfs: server not defined\n");
364 bld = hdfsNewBuilder();
367 log_err("hdfs: unable to allocate connect builder\n");
370 hdfsBuilderSetNameNode(bld, options->host);
371 hdfsBuilderSetNameNodePort(bld, options->port);
372 if(! options->single_instance) {
373 hdfsBuilderSetForceNewInstance(bld);
375 hd->fs = hdfsBuilderConnect(bld);
377 /* hdfsSetWorkingDirectory succeed on non-existent directory */
378 if (hdfsExists(hd->fs, options->directory) < 0 || hdfsSetWorkingDirectory(hd->fs, options->directory) < 0) {
380 log_err("hdfs: invalid working directory %s: %s\n", options->directory, strerror(errno));
387 static void fio_hdfsio_io_u_free(struct thread_data *td, struct io_u *io_u)
389 struct hdfsio_data *hd = td->io_ops_data;
391 if (hd->fs && hdfsDisconnect(hd->fs) < 0) {
392 log_err("hdfs: disconnect failed: %d\n", errno);
396 FIO_STATIC struct ioengine_ops ioengine = {
398 .version = FIO_IOOPS_VERSION,
399 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_NODISKUTIL,
400 .setup = fio_hdfsio_setup,
401 .init = fio_hdfsio_init,
402 .prep = fio_hdfsio_prep,
403 .queue = fio_hdfsio_queue,
404 .open_file = fio_hdfsio_open_file,
405 .close_file = fio_hdfsio_close_file,
406 .io_u_init = fio_hdfsio_io_u_init,
407 .io_u_free = fio_hdfsio_io_u_free,
408 .option_struct_size = sizeof(struct hdfsio_options),
413 static void fio_init fio_hdfsio_register(void)
415 register_ioengine(&ioengine);
418 static void fio_exit fio_hdfsio_unregister(void)
420 unregister_ioengine(&ioengine);