Don't malloc/memcpy ioengine_ops on td initialization
[fio.git] / engines / libhdfs.c
1 /*
2  * libhdfs engine
3  *
4  * this engine helps perform read/write operations on hdfs cluster using
5  * libhdfs. hdfs doesnot support modification of data once file is created.
6  *
7  * so to mimic that create many files of small size (e.g 256k), and this
8  * engine select a file based on the offset generated by fio.
9  *
10  * thus, random reads and writes can also be achieved with this logic.
11  *
12  */
13
14 #include <math.h>
15 #include <hdfs.h>
16
17 #include "../fio.h"
18 #include "../optgroup.h"
19
20 #define CHUNCK_NAME_LENGTH_MAX 80
21 #define CHUNCK_CREATION_BUFFER_SIZE 65536
22
23 struct hdfsio_data {
24         hdfsFS fs;
25         hdfsFile fp;
26         uint64_t curr_file_id;
27 };
28
29 struct hdfsio_options {
30         void *pad;                      /* needed because offset can't be 0 for a option defined used offsetof */
31         char *host;
32         char *directory;
33         unsigned int port;
34         unsigned int chunck_size;
35         unsigned int single_instance;
36         unsigned int use_direct;
37 };
38
39 static struct fio_option options[] = {
40         {
41                 .name   = "namenode",
42                 .lname  = "hfds namenode",
43                 .type   = FIO_OPT_STR_STORE,
44                 .off1   = offsetof(struct hdfsio_options, host),
45                 .def    = "localhost",
46                 .help   = "Namenode of the HDFS cluster",
47                 .category = FIO_OPT_C_ENGINE,
48                 .group  = FIO_OPT_G_HDFS,
49         },
50         {
51                 .name   = "hostname",
52                 .lname  = "hfds namenode",
53                 .type   = FIO_OPT_STR_STORE,
54                 .off1   = offsetof(struct hdfsio_options, host),
55                 .def    = "localhost",
56                 .help   = "Namenode of the HDFS cluster",
57                 .category = FIO_OPT_C_ENGINE,
58                 .group  = FIO_OPT_G_HDFS,
59         },
60         {
61                 .name   = "port",
62                 .lname  = "hdfs namenode port",
63                 .type   = FIO_OPT_INT,
64                 .off1   = offsetof(struct hdfsio_options, port),
65                 .def    = "9000",
66                 .minval = 1,
67                 .maxval = 65535,
68                 .help   = "Port used by the HDFS cluster namenode",
69                 .category = FIO_OPT_C_ENGINE,
70                 .group  = FIO_OPT_G_HDFS,
71         },
72         {
73                 .name   = "hdfsdirectory",
74                 .lname  = "hfds directory",
75                 .type   = FIO_OPT_STR_STORE,
76                 .off1   = offsetof(struct hdfsio_options, directory),
77                 .def    = "/",
78                 .help   = "The HDFS directory where fio will create chuncks",
79                 .category = FIO_OPT_C_ENGINE,
80                 .group  = FIO_OPT_G_HDFS,
81         },
82         {
83                 .name   = "chunck_size",
84                 .type   = FIO_OPT_INT,
85                 .off1   = offsetof(struct hdfsio_options, chunck_size),
86                 .def    = "1048576",
87                 .help   = "Size of individual chunck",
88                 .category = FIO_OPT_C_ENGINE,
89                 .group  = FIO_OPT_G_HDFS,
90         },
91         {
92                 .name   = "single_instance",
93                 .type   = FIO_OPT_BOOL,
94                 .off1   = offsetof(struct hdfsio_options, single_instance),
95                 .def    = "1",
96                 .help   = "Use a single instance",
97                 .category = FIO_OPT_C_ENGINE,
98                 .group  = FIO_OPT_G_HDFS,
99         },
100         {
101                 .name   = "hdfs_use_direct",
102                 .type   = FIO_OPT_BOOL,
103                 .off1   = offsetof(struct hdfsio_options, use_direct),
104                 .def    = "0",
105                 .help   = "Use readDirect instead of hdfsRead",
106                 .category = FIO_OPT_C_ENGINE,
107                 .group  = FIO_OPT_G_HDFS,
108         },
109         {
110                 .name   = NULL,
111         },
112 };
113
114
115 static int get_chunck_name(char *dest, char *file_name, uint64_t chunk_id) {
116         return snprintf(dest, CHUNCK_NAME_LENGTH_MAX, "%s_%lu", file_name, chunk_id);
117 }
118
119 static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u)
120 {
121         struct hdfsio_options *options = td->eo;
122         struct hdfsio_data *hd = td->io_ops_data;
123         unsigned long f_id;
124         char fname[CHUNCK_NAME_LENGTH_MAX];
125         int open_flags;
126
127         /* find out file id based on the offset generated by fio */
128         f_id = floor(io_u->offset / options-> chunck_size);
129
130         if (f_id == hd->curr_file_id) {
131                 /* file is already open */
132                 return 0;
133         }
134
135         if (hd->curr_file_id != -1) {
136                 if ( hdfsCloseFile(hd->fs, hd->fp) == -1) {
137                         log_err("hdfs: unable to close file: %s\n", strerror(errno));
138                         return errno;
139                 }
140                 hd->curr_file_id = -1;
141         }
142
143         if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_SYNC) {
144                 open_flags = O_RDONLY;
145         } else if (io_u->ddir == DDIR_WRITE) {
146                 open_flags = O_WRONLY;
147         } else {
148                 log_err("hdfs: Invalid I/O Operation\n");
149                 return 0;
150         }
151         
152         get_chunck_name(fname, io_u->file->file_name, f_id);
153         hd->fp = hdfsOpenFile(hd->fs, fname, open_flags, 0, 0,
154                               options->chunck_size);
155         if(hd->fp == NULL) {
156                 log_err("hdfs: unable to open file: %s: %d\n", fname, strerror(errno));
157                 return errno;
158         }
159         hd->curr_file_id = f_id;
160
161         return 0;
162 }
163
164 static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u)
165 {
166         struct hdfsio_data *hd = td->io_ops_data;
167         struct hdfsio_options *options = td->eo;
168         int ret;
169         unsigned long offset;
170         
171         offset = io_u->offset % options->chunck_size;
172         
173         if( (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) && 
174              hdfsTell(hd->fs, hd->fp) != offset && hdfsSeek(hd->fs, hd->fp, offset) != 0 ) {
175                 log_err("hdfs: seek failed: %s, are you doing random write smaller than chunck size ?\n", strerror(errno));
176                 io_u->error = errno;
177                 return FIO_Q_COMPLETED;
178         };
179
180         // do the IO
181         if (io_u->ddir == DDIR_READ) {
182                 if (options->use_direct) {
183                         ret = readDirect(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
184                 } else {
185                         ret = hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
186                 }
187         } else if (io_u->ddir == DDIR_WRITE) {
188                 ret = hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf,
189                                 io_u->xfer_buflen);
190         } else if (io_u->ddir == DDIR_SYNC) {
191                 ret = hdfsFlush(hd->fs, hd->fp);
192         } else {
193                 log_err("hdfs: Invalid I/O Operation: %d\n", io_u->ddir);
194                 ret = EINVAL;
195         }
196
197         // Check if the IO went fine, or is incomplete
198         if (ret != (int)io_u->xfer_buflen) {
199                 if (ret >= 0) {
200                         io_u->resid = io_u->xfer_buflen - ret;
201                         io_u->error = 0;
202                         return FIO_Q_COMPLETED;
203                 } else {
204                         io_u->error = errno;
205                 }
206         }
207
208         if (io_u->error)
209                 td_verror(td, io_u->error, "xfer");
210
211         return FIO_Q_COMPLETED;
212 }
213
214 int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f)
215 {
216         if (td->o.odirect) {
217                 td->error = EINVAL;
218                 return 0;
219         }
220
221         return 0;
222 }
223
224 int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f)
225 {
226         struct hdfsio_data *hd = td->io_ops_data;
227
228         if (hd->curr_file_id != -1) {
229                 if ( hdfsCloseFile(hd->fs, hd->fp) == -1) {
230                         log_err("hdfs: unable to close file: %s\n", strerror(errno));
231                         return errno;
232                 }
233                 hd->curr_file_id = -1;
234         }
235         return 0;
236 }
237
238 static int fio_hdfsio_init(struct thread_data *td)
239 {
240         struct hdfsio_options *options = td->eo;
241         struct hdfsio_data *hd = td->io_ops_data;
242         struct fio_file *f;
243         uint64_t j,k;
244         int i, failure = 0;
245         uint8_t buffer[CHUNCK_CREATION_BUFFER_SIZE];
246         uint64_t bytes_left;    
247         char fname[CHUNCK_NAME_LENGTH_MAX];     
248         hdfsFile fp;
249         hdfsFileInfo *fi;
250         tOffset fi_size;
251
252         for_each_file(td, f, i) {
253                 k = 0;
254                 for(j=0; j < f->real_file_size; j += options->chunck_size) {
255                         get_chunck_name(fname, f->file_name, k++);
256                         fi = hdfsGetPathInfo(hd->fs, fname);
257                         fi_size = fi ? fi->mSize : 0;
258                         // fill exist and is big enough, nothing to do
259                         if( fi && fi_size >= options->chunck_size) {
260                                 continue;
261                         }
262                         fp = hdfsOpenFile(hd->fs, fname, O_WRONLY, 0, 0,
263                                           options->chunck_size);
264                         if(fp == NULL) {
265                                 failure = errno;
266                                 log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
267                                 break;
268                         }
269                         bytes_left = options->chunck_size;
270                         memset(buffer, 0, CHUNCK_CREATION_BUFFER_SIZE);
271                         while( bytes_left > CHUNCK_CREATION_BUFFER_SIZE) {
272                                 if( hdfsWrite(hd->fs, fp, buffer, CHUNCK_CREATION_BUFFER_SIZE)
273                                     != CHUNCK_CREATION_BUFFER_SIZE) {
274                                         failure = errno;
275                                         log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
276                                         break;
277                                 };
278                                 bytes_left -= CHUNCK_CREATION_BUFFER_SIZE;
279                         }
280                         if(bytes_left > 0) {
281                                 if( hdfsWrite(hd->fs, fp, buffer, bytes_left)
282                                     != bytes_left) {
283                                         failure = errno;
284                                         break;
285                                 };
286                         }
287                         if( hdfsCloseFile(hd->fs, fp) != 0) {
288                                 failure = errno;
289                                 log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
290                                 break;
291                         }
292                 }
293                 if(failure) {
294                         break;
295                 }
296         }
297         
298         if( !failure ) {
299                 fio_file_set_size_known(f);
300         }
301
302         return failure;
303 }
304
305 static int fio_hdfsio_setup(struct thread_data *td)
306 {
307         struct hdfsio_data *hd;
308         struct fio_file *f;
309         int i;
310         uint64_t file_size, total_file_size;
311
312         if (!td->io_ops_data) {
313                 hd = malloc(sizeof(*hd));
314                 memset(hd, 0, sizeof(*hd));
315                 
316                 hd->curr_file_id = -1;
317
318                 td->io_ops_data = hd;
319         }
320         
321         total_file_size = 0;
322         file_size = 0;
323
324         for_each_file(td, f, i) {
325                 if(!td->o.file_size_low) {
326                         file_size = floor(td->o.size / td->o.nr_files);
327                         total_file_size += file_size;
328                 }
329                 else if (td->o.file_size_low == td->o.file_size_high)
330                         file_size = td->o.file_size_low;
331                 else {
332                         file_size = get_rand_file_size(td);
333                 }
334                 f->real_file_size = file_size;
335         }
336         /* If the size doesn't divide nicely with the chunck size,
337          * make the last files bigger.
338          * Used only if filesize was not explicitely given
339          */
340         if (!td->o.file_size_low && total_file_size < td->o.size) {
341                 f->real_file_size += (td->o.size - total_file_size);
342         }
343
344         return 0;
345 }
346
347 static int fio_hdfsio_io_u_init(struct thread_data *td, struct io_u *io_u)
348 {
349         struct hdfsio_data *hd = td->io_ops_data;
350         struct hdfsio_options *options = td->eo;
351         int failure;
352         struct hdfsBuilder *bld;
353
354         if (options->host == NULL || options->port == 0) {
355                 log_err("hdfs: server not defined\n");
356                 return EINVAL;
357         }
358         
359         bld = hdfsNewBuilder();
360         if (!bld) {
361                 failure = errno;
362                 log_err("hdfs: unable to allocate connect builder\n");
363                 return failure;
364         }
365         hdfsBuilderSetNameNode(bld, options->host);
366         hdfsBuilderSetNameNodePort(bld, options->port);
367         if(! options->single_instance) {
368                 hdfsBuilderSetForceNewInstance(bld);
369         }
370         hd->fs = hdfsBuilderConnect(bld);
371         
372         /* hdfsSetWorkingDirectory succeed on non existend directory */
373         if (hdfsExists(hd->fs, options->directory) < 0 || hdfsSetWorkingDirectory(hd->fs, options->directory) < 0) {
374                 failure = errno;
375                 log_err("hdfs: invalid working directory %s: %s\n", options->directory, strerror(errno));
376                 return failure;
377         }
378         
379         return 0;
380 }
381
382 static void fio_hdfsio_io_u_free(struct thread_data *td, struct io_u *io_u)
383 {
384         struct hdfsio_data *hd = td->io_ops_data;
385
386         if (hd->fs && hdfsDisconnect(hd->fs) < 0) {
387                 log_err("hdfs: disconnect failed: %d\n", errno);
388         }
389 }
390
391 static struct ioengine_ops ioengine_hdfs = {
392         .name = "libhdfs",
393         .version = FIO_IOOPS_VERSION,
394         .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_NODISKUTIL,
395         .setup = fio_hdfsio_setup,
396         .init = fio_hdfsio_init,
397         .prep = fio_hdfsio_prep,
398         .queue = fio_hdfsio_queue,
399         .open_file = fio_hdfsio_open_file,
400         .close_file = fio_hdfsio_close_file,
401         .io_u_init = fio_hdfsio_io_u_init,
402         .io_u_free = fio_hdfsio_io_u_free,
403         .option_struct_size     = sizeof(struct hdfsio_options),
404         .options                = options,
405 };
406
407
408 static void fio_init fio_hdfsio_register(void)
409 {
410         register_ioengine(&ioengine_hdfs);
411 }
412
413 static void fio_exit fio_hdfsio_unregister(void)
414 {
415         unregister_ioengine(&ioengine_hdfs);
416 }