use poll() and rbd_poll_io_events to speed up io retrieval.
[fio.git] / engines / libhdfs.c
1 /*
2  * libhdfs engine
3  *
4  * this engine helps perform read/write operations on hdfs cluster using
5  * libhdfs. hdfs doesnot support modification of data once file is created.
6  *
7  * so to mimic that create many files of small size (e.g 256k), and this
8  * engine select a file based on the offset generated by fio.
9  *
10  * thus, random reads and writes can also be achieved with this logic.
11  *
12  */
13
14 #include <math.h>
15 #include <hdfs.h>
16
17 #include "../fio.h"
18 #include "../optgroup.h"
19
20 #define CHUNCK_NAME_LENGTH_MAX 80
21 #define CHUNCK_CREATION_BUFFER_SIZE 65536
22
23 struct hdfsio_data {
24         hdfsFS fs;
25         hdfsFile fp;
26         uint64_t curr_file_id;
27 };
28
29 struct hdfsio_options {
30         void *pad;                      /* needed because offset can't be 0 for a option defined used offsetof */
31         char *host;
32         char *directory;
33         unsigned int port;
34         unsigned int chunck_size;
35         unsigned int single_instance;
36         unsigned int use_direct;
37 };
38
39 static struct fio_option options[] = {
40         {
41                 .name   = "namenode",
42                 .lname  = "hfds namenode",
43                 .type   = FIO_OPT_STR_STORE,
44                 .off1   = offsetof(struct hdfsio_options, host),
45                 .def    = "localhost",
46                 .help   = "Namenode of the HDFS cluster",
47                 .category = FIO_OPT_C_ENGINE,
48                 .group  = FIO_OPT_G_HDFS,
49         },
50         {
51                 .name   = "hostname",
52                 .lname  = "hfds namenode",
53                 .type   = FIO_OPT_STR_STORE,
54                 .off1   = offsetof(struct hdfsio_options, host),
55                 .def    = "localhost",
56                 .help   = "Namenode of the HDFS cluster",
57                 .category = FIO_OPT_C_ENGINE,
58                 .group  = FIO_OPT_G_HDFS,
59         },
60         {
61                 .name   = "port",
62                 .lname  = "hdfs namenode port",
63                 .type   = FIO_OPT_INT,
64                 .off1   = offsetof(struct hdfsio_options, port),
65                 .def    = "9000",
66                 .minval = 1,
67                 .maxval = 65535,
68                 .help   = "Port used by the HDFS cluster namenode",
69                 .category = FIO_OPT_C_ENGINE,
70                 .group  = FIO_OPT_G_HDFS,
71         },
72         {
73                 .name   = "hdfsdirectory",
74                 .lname  = "hfds directory",
75                 .type   = FIO_OPT_STR_STORE,
76                 .off1   = offsetof(struct hdfsio_options, directory),
77                 .def    = "/",
78                 .help   = "The HDFS directory where fio will create chuncks",
79                 .category = FIO_OPT_C_ENGINE,
80                 .group  = FIO_OPT_G_HDFS,
81         },
82         {
83                 .name   = "chunk_size",
84                 .alias  = "chunck_size",
85                 .lname  = "Chunk size",
86                 .type   = FIO_OPT_INT,
87                 .off1   = offsetof(struct hdfsio_options, chunck_size),
88                 .def    = "1048576",
89                 .help   = "Size of individual chunck",
90                 .category = FIO_OPT_C_ENGINE,
91                 .group  = FIO_OPT_G_HDFS,
92         },
93         {
94                 .name   = "single_instance",
95                 .lname  = "Single Instance",
96                 .type   = FIO_OPT_BOOL,
97                 .off1   = offsetof(struct hdfsio_options, single_instance),
98                 .def    = "1",
99                 .help   = "Use a single instance",
100                 .category = FIO_OPT_C_ENGINE,
101                 .group  = FIO_OPT_G_HDFS,
102         },
103         {
104                 .name   = "hdfs_use_direct",
105                 .lname  = "HDFS Use Direct",
106                 .type   = FIO_OPT_BOOL,
107                 .off1   = offsetof(struct hdfsio_options, use_direct),
108                 .def    = "0",
109                 .help   = "Use readDirect instead of hdfsRead",
110                 .category = FIO_OPT_C_ENGINE,
111                 .group  = FIO_OPT_G_HDFS,
112         },
113         {
114                 .name   = NULL,
115         },
116 };
117
118
119 static int get_chunck_name(char *dest, char *file_name, uint64_t chunk_id) {
120         return snprintf(dest, CHUNCK_NAME_LENGTH_MAX, "%s_%lu", file_name, chunk_id);
121 }
122
123 static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u)
124 {
125         struct hdfsio_options *options = td->eo;
126         struct hdfsio_data *hd = td->io_ops_data;
127         unsigned long f_id;
128         char fname[CHUNCK_NAME_LENGTH_MAX];
129         int open_flags;
130
131         /* find out file id based on the offset generated by fio */
132         f_id = floor(io_u->offset / options-> chunck_size);
133
134         if (f_id == hd->curr_file_id) {
135                 /* file is already open */
136                 return 0;
137         }
138
139         if (hd->curr_file_id != -1) {
140                 if ( hdfsCloseFile(hd->fs, hd->fp) == -1) {
141                         log_err("hdfs: unable to close file: %s\n", strerror(errno));
142                         return errno;
143                 }
144                 hd->curr_file_id = -1;
145         }
146
147         if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_SYNC) {
148                 open_flags = O_RDONLY;
149         } else if (io_u->ddir == DDIR_WRITE) {
150                 open_flags = O_WRONLY;
151         } else {
152                 log_err("hdfs: Invalid I/O Operation\n");
153                 return 0;
154         }
155         
156         get_chunck_name(fname, io_u->file->file_name, f_id);
157         hd->fp = hdfsOpenFile(hd->fs, fname, open_flags, 0, 0,
158                               options->chunck_size);
159         if(hd->fp == NULL) {
160                 log_err("hdfs: unable to open file: %s: %d\n", fname, strerror(errno));
161                 return errno;
162         }
163         hd->curr_file_id = f_id;
164
165         return 0;
166 }
167
168 static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u)
169 {
170         struct hdfsio_data *hd = td->io_ops_data;
171         struct hdfsio_options *options = td->eo;
172         int ret;
173         unsigned long offset;
174         
175         offset = io_u->offset % options->chunck_size;
176         
177         if( (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) && 
178              hdfsTell(hd->fs, hd->fp) != offset && hdfsSeek(hd->fs, hd->fp, offset) != 0 ) {
179                 log_err("hdfs: seek failed: %s, are you doing random write smaller than chunck size ?\n", strerror(errno));
180                 io_u->error = errno;
181                 return FIO_Q_COMPLETED;
182         };
183
184         // do the IO
185         if (io_u->ddir == DDIR_READ) {
186                 if (options->use_direct) {
187                         ret = readDirect(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
188                 } else {
189                         ret = hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
190                 }
191         } else if (io_u->ddir == DDIR_WRITE) {
192                 ret = hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf,
193                                 io_u->xfer_buflen);
194         } else if (io_u->ddir == DDIR_SYNC) {
195                 ret = hdfsFlush(hd->fs, hd->fp);
196         } else {
197                 log_err("hdfs: Invalid I/O Operation: %d\n", io_u->ddir);
198                 ret = EINVAL;
199         }
200
201         // Check if the IO went fine, or is incomplete
202         if (ret != (int)io_u->xfer_buflen) {
203                 if (ret >= 0) {
204                         io_u->resid = io_u->xfer_buflen - ret;
205                         io_u->error = 0;
206                         return FIO_Q_COMPLETED;
207                 } else {
208                         io_u->error = errno;
209                 }
210         }
211
212         if (io_u->error)
213                 td_verror(td, io_u->error, "xfer");
214
215         return FIO_Q_COMPLETED;
216 }
217
218 int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f)
219 {
220         if (td->o.odirect) {
221                 td->error = EINVAL;
222                 return 0;
223         }
224
225         return 0;
226 }
227
228 int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f)
229 {
230         struct hdfsio_data *hd = td->io_ops_data;
231
232         if (hd->curr_file_id != -1) {
233                 if ( hdfsCloseFile(hd->fs, hd->fp) == -1) {
234                         log_err("hdfs: unable to close file: %s\n", strerror(errno));
235                         return errno;
236                 }
237                 hd->curr_file_id = -1;
238         }
239         return 0;
240 }
241
242 static int fio_hdfsio_init(struct thread_data *td)
243 {
244         struct hdfsio_options *options = td->eo;
245         struct hdfsio_data *hd = td->io_ops_data;
246         struct fio_file *f;
247         uint64_t j,k;
248         int i, failure = 0;
249         uint8_t buffer[CHUNCK_CREATION_BUFFER_SIZE];
250         uint64_t bytes_left;    
251         char fname[CHUNCK_NAME_LENGTH_MAX];     
252         hdfsFile fp;
253         hdfsFileInfo *fi;
254         tOffset fi_size;
255
256         for_each_file(td, f, i) {
257                 k = 0;
258                 for(j=0; j < f->real_file_size; j += options->chunck_size) {
259                         get_chunck_name(fname, f->file_name, k++);
260                         fi = hdfsGetPathInfo(hd->fs, fname);
261                         fi_size = fi ? fi->mSize : 0;
262                         // fill exist and is big enough, nothing to do
263                         if( fi && fi_size >= options->chunck_size) {
264                                 continue;
265                         }
266                         fp = hdfsOpenFile(hd->fs, fname, O_WRONLY, 0, 0,
267                                           options->chunck_size);
268                         if(fp == NULL) {
269                                 failure = errno;
270                                 log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
271                                 break;
272                         }
273                         bytes_left = options->chunck_size;
274                         memset(buffer, 0, CHUNCK_CREATION_BUFFER_SIZE);
275                         while( bytes_left > CHUNCK_CREATION_BUFFER_SIZE) {
276                                 if( hdfsWrite(hd->fs, fp, buffer, CHUNCK_CREATION_BUFFER_SIZE)
277                                     != CHUNCK_CREATION_BUFFER_SIZE) {
278                                         failure = errno;
279                                         log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
280                                         break;
281                                 };
282                                 bytes_left -= CHUNCK_CREATION_BUFFER_SIZE;
283                         }
284                         if(bytes_left > 0) {
285                                 if( hdfsWrite(hd->fs, fp, buffer, bytes_left)
286                                     != bytes_left) {
287                                         failure = errno;
288                                         break;
289                                 };
290                         }
291                         if( hdfsCloseFile(hd->fs, fp) != 0) {
292                                 failure = errno;
293                                 log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
294                                 break;
295                         }
296                 }
297                 if(failure) {
298                         break;
299                 }
300         }
301         
302         if( !failure ) {
303                 fio_file_set_size_known(f);
304         }
305
306         return failure;
307 }
308
309 static int fio_hdfsio_setup(struct thread_data *td)
310 {
311         struct hdfsio_data *hd;
312         struct fio_file *f;
313         int i;
314         uint64_t file_size, total_file_size;
315
316         if (!td->io_ops_data) {
317                 hd = malloc(sizeof(*hd));
318                 memset(hd, 0, sizeof(*hd));
319                 
320                 hd->curr_file_id = -1;
321
322                 td->io_ops_data = hd;
323         }
324         
325         total_file_size = 0;
326         file_size = 0;
327
328         for_each_file(td, f, i) {
329                 if(!td->o.file_size_low) {
330                         file_size = floor(td->o.size / td->o.nr_files);
331                         total_file_size += file_size;
332                 }
333                 else if (td->o.file_size_low == td->o.file_size_high)
334                         file_size = td->o.file_size_low;
335                 else {
336                         file_size = get_rand_file_size(td);
337                 }
338                 f->real_file_size = file_size;
339         }
340         /* If the size doesn't divide nicely with the chunck size,
341          * make the last files bigger.
342          * Used only if filesize was not explicitely given
343          */
344         if (!td->o.file_size_low && total_file_size < td->o.size) {
345                 f->real_file_size += (td->o.size - total_file_size);
346         }
347
348         return 0;
349 }
350
351 static int fio_hdfsio_io_u_init(struct thread_data *td, struct io_u *io_u)
352 {
353         struct hdfsio_data *hd = td->io_ops_data;
354         struct hdfsio_options *options = td->eo;
355         int failure;
356         struct hdfsBuilder *bld;
357
358         if (options->host == NULL || options->port == 0) {
359                 log_err("hdfs: server not defined\n");
360                 return EINVAL;
361         }
362         
363         bld = hdfsNewBuilder();
364         if (!bld) {
365                 failure = errno;
366                 log_err("hdfs: unable to allocate connect builder\n");
367                 return failure;
368         }
369         hdfsBuilderSetNameNode(bld, options->host);
370         hdfsBuilderSetNameNodePort(bld, options->port);
371         if(! options->single_instance) {
372                 hdfsBuilderSetForceNewInstance(bld);
373         }
374         hd->fs = hdfsBuilderConnect(bld);
375         
376         /* hdfsSetWorkingDirectory succeed on non existend directory */
377         if (hdfsExists(hd->fs, options->directory) < 0 || hdfsSetWorkingDirectory(hd->fs, options->directory) < 0) {
378                 failure = errno;
379                 log_err("hdfs: invalid working directory %s: %s\n", options->directory, strerror(errno));
380                 return failure;
381         }
382         
383         return 0;
384 }
385
386 static void fio_hdfsio_io_u_free(struct thread_data *td, struct io_u *io_u)
387 {
388         struct hdfsio_data *hd = td->io_ops_data;
389
390         if (hd->fs && hdfsDisconnect(hd->fs) < 0) {
391                 log_err("hdfs: disconnect failed: %d\n", errno);
392         }
393 }
394
395 static struct ioengine_ops ioengine_hdfs = {
396         .name = "libhdfs",
397         .version = FIO_IOOPS_VERSION,
398         .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_NODISKUTIL,
399         .setup = fio_hdfsio_setup,
400         .init = fio_hdfsio_init,
401         .prep = fio_hdfsio_prep,
402         .queue = fio_hdfsio_queue,
403         .open_file = fio_hdfsio_open_file,
404         .close_file = fio_hdfsio_close_file,
405         .io_u_init = fio_hdfsio_io_u_init,
406         .io_u_free = fio_hdfsio_io_u_free,
407         .option_struct_size     = sizeof(struct hdfsio_options),
408         .options                = options,
409 };
410
411
412 static void fio_init fio_hdfsio_register(void)
413 {
414         register_ioengine(&ioengine_hdfs);
415 }
416
417 static void fio_exit fio_hdfsio_unregister(void)
418 {
419         unregister_ioengine(&ioengine_hdfs);
420 }