engines/io_uring: ensure sqe stores are ordered SQ ring tail update
[fio.git] / engines / libhdfs.c
1 /*
2  * libhdfs engine
3  *
4  * this engine helps perform read/write operations on hdfs cluster using
5  * libhdfs. hdfs doesnot support modification of data once file is created.
6  *
7  * so to mimic that create many files of small size (e.g 256k), and this
8  * engine select a file based on the offset generated by fio.
9  *
10  * thus, random reads and writes can also be achieved with this logic.
11  *
12  */
13
14 #include <math.h>
15 #include <hdfs.h>
16
17 #include "../fio.h"
18 #include "../optgroup.h"
19
20 #define CHUNCK_NAME_LENGTH_MAX 80
21 #define CHUNCK_CREATION_BUFFER_SIZE 65536
22
23 struct hdfsio_data {
24         hdfsFS fs;
25         hdfsFile fp;
26         uint64_t curr_file_id;
27 };
28
29 struct hdfsio_options {
30         void *pad;                      /* needed because offset can't be 0 for a option defined used offsetof */
31         char *host;
32         char *directory;
33         unsigned int port;
34         unsigned int chunck_size;
35         unsigned int single_instance;
36         unsigned int use_direct;
37 };
38
39 static struct fio_option options[] = {
40         {
41                 .name   = "namenode",
42                 .lname  = "hfds namenode",
43                 .type   = FIO_OPT_STR_STORE,
44                 .off1   = offsetof(struct hdfsio_options, host),
45                 .def    = "localhost",
46                 .help   = "Namenode of the HDFS cluster",
47                 .category = FIO_OPT_C_ENGINE,
48                 .group  = FIO_OPT_G_HDFS,
49         },
50         {
51                 .name   = "hostname",
52                 .lname  = "hfds namenode",
53                 .type   = FIO_OPT_STR_STORE,
54                 .off1   = offsetof(struct hdfsio_options, host),
55                 .def    = "localhost",
56                 .help   = "Namenode of the HDFS cluster",
57                 .category = FIO_OPT_C_ENGINE,
58                 .group  = FIO_OPT_G_HDFS,
59         },
60         {
61                 .name   = "port",
62                 .lname  = "hdfs namenode port",
63                 .type   = FIO_OPT_INT,
64                 .off1   = offsetof(struct hdfsio_options, port),
65                 .def    = "9000",
66                 .minval = 1,
67                 .maxval = 65535,
68                 .help   = "Port used by the HDFS cluster namenode",
69                 .category = FIO_OPT_C_ENGINE,
70                 .group  = FIO_OPT_G_HDFS,
71         },
72         {
73                 .name   = "hdfsdirectory",
74                 .lname  = "hfds directory",
75                 .type   = FIO_OPT_STR_STORE,
76                 .off1   = offsetof(struct hdfsio_options, directory),
77                 .def    = "/",
78                 .help   = "The HDFS directory where fio will create chuncks",
79                 .category = FIO_OPT_C_ENGINE,
80                 .group  = FIO_OPT_G_HDFS,
81         },
82         {
83                 .name   = "chunk_size",
84                 .alias  = "chunck_size",
85                 .lname  = "Chunk size",
86                 .type   = FIO_OPT_INT,
87                 .off1   = offsetof(struct hdfsio_options, chunck_size),
88                 .def    = "1048576",
89                 .help   = "Size of individual chunck",
90                 .category = FIO_OPT_C_ENGINE,
91                 .group  = FIO_OPT_G_HDFS,
92         },
93         {
94                 .name   = "single_instance",
95                 .lname  = "Single Instance",
96                 .type   = FIO_OPT_BOOL,
97                 .off1   = offsetof(struct hdfsio_options, single_instance),
98                 .def    = "1",
99                 .help   = "Use a single instance",
100                 .category = FIO_OPT_C_ENGINE,
101                 .group  = FIO_OPT_G_HDFS,
102         },
103         {
104                 .name   = "hdfs_use_direct",
105                 .lname  = "HDFS Use Direct",
106                 .type   = FIO_OPT_BOOL,
107                 .off1   = offsetof(struct hdfsio_options, use_direct),
108                 .def    = "0",
109                 .help   = "Use readDirect instead of hdfsRead",
110                 .category = FIO_OPT_C_ENGINE,
111                 .group  = FIO_OPT_G_HDFS,
112         },
113         {
114                 .name   = NULL,
115         },
116 };
117
118
119 static int get_chunck_name(char *dest, char *file_name, uint64_t chunk_id) {
120         return snprintf(dest, CHUNCK_NAME_LENGTH_MAX, "%s_%lu", file_name, chunk_id);
121 }
122
123 static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u)
124 {
125         struct hdfsio_options *options = td->eo;
126         struct hdfsio_data *hd = td->io_ops_data;
127         unsigned long f_id;
128         char fname[CHUNCK_NAME_LENGTH_MAX];
129         int open_flags;
130
131         /* find out file id based on the offset generated by fio */
132         f_id = floor(io_u->offset / options-> chunck_size);
133
134         if (f_id == hd->curr_file_id) {
135                 /* file is already open */
136                 return 0;
137         }
138
139         if (hd->curr_file_id != -1) {
140                 if ( hdfsCloseFile(hd->fs, hd->fp) == -1) {
141                         log_err("hdfs: unable to close file: %s\n", strerror(errno));
142                         return errno;
143                 }
144                 hd->curr_file_id = -1;
145         }
146
147         if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_SYNC) {
148                 open_flags = O_RDONLY;
149         } else if (io_u->ddir == DDIR_WRITE) {
150                 open_flags = O_WRONLY;
151         } else {
152                 log_err("hdfs: Invalid I/O Operation\n");
153                 return 0;
154         }
155         
156         get_chunck_name(fname, io_u->file->file_name, f_id);
157         hd->fp = hdfsOpenFile(hd->fs, fname, open_flags, 0, 0,
158                               options->chunck_size);
159         if(hd->fp == NULL) {
160                 log_err("hdfs: unable to open file: %s: %d\n", fname, strerror(errno));
161                 return errno;
162         }
163         hd->curr_file_id = f_id;
164
165         return 0;
166 }
167
168 static enum fio_q_status fio_hdfsio_queue(struct thread_data *td,
169                                           struct io_u *io_u)
170 {
171         struct hdfsio_data *hd = td->io_ops_data;
172         struct hdfsio_options *options = td->eo;
173         int ret;
174         unsigned long offset;
175         
176         offset = io_u->offset % options->chunck_size;
177         
178         if( (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) && 
179              hdfsTell(hd->fs, hd->fp) != offset && hdfsSeek(hd->fs, hd->fp, offset) != 0 ) {
180                 log_err("hdfs: seek failed: %s, are you doing random write smaller than chunck size ?\n", strerror(errno));
181                 io_u->error = errno;
182                 return FIO_Q_COMPLETED;
183         };
184
185         // do the IO
186         if (io_u->ddir == DDIR_READ) {
187                 if (options->use_direct) {
188                         ret = readDirect(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
189                 } else {
190                         ret = hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
191                 }
192         } else if (io_u->ddir == DDIR_WRITE) {
193                 ret = hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf,
194                                 io_u->xfer_buflen);
195         } else if (io_u->ddir == DDIR_SYNC) {
196                 ret = hdfsFlush(hd->fs, hd->fp);
197         } else {
198                 log_err("hdfs: Invalid I/O Operation: %d\n", io_u->ddir);
199                 ret = EINVAL;
200         }
201
202         // Check if the IO went fine, or is incomplete
203         if (ret != (int)io_u->xfer_buflen) {
204                 if (ret >= 0) {
205                         io_u->resid = io_u->xfer_buflen - ret;
206                         io_u->error = 0;
207                         return FIO_Q_COMPLETED;
208                 } else {
209                         io_u->error = errno;
210                 }
211         }
212
213         if (io_u->error)
214                 td_verror(td, io_u->error, "xfer");
215
216         return FIO_Q_COMPLETED;
217 }
218
219 int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f)
220 {
221         if (td->o.odirect) {
222                 td->error = EINVAL;
223                 return 0;
224         }
225
226         return 0;
227 }
228
229 int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f)
230 {
231         struct hdfsio_data *hd = td->io_ops_data;
232
233         if (hd->curr_file_id != -1) {
234                 if ( hdfsCloseFile(hd->fs, hd->fp) == -1) {
235                         log_err("hdfs: unable to close file: %s\n", strerror(errno));
236                         return errno;
237                 }
238                 hd->curr_file_id = -1;
239         }
240         return 0;
241 }
242
243 static int fio_hdfsio_init(struct thread_data *td)
244 {
245         struct hdfsio_options *options = td->eo;
246         struct hdfsio_data *hd = td->io_ops_data;
247         struct fio_file *f;
248         uint64_t j,k;
249         int i, failure = 0;
250         uint8_t buffer[CHUNCK_CREATION_BUFFER_SIZE];
251         uint64_t bytes_left;    
252         char fname[CHUNCK_NAME_LENGTH_MAX];     
253         hdfsFile fp;
254         hdfsFileInfo *fi;
255         tOffset fi_size;
256
257         for_each_file(td, f, i) {
258                 k = 0;
259                 for(j=0; j < f->real_file_size; j += options->chunck_size) {
260                         get_chunck_name(fname, f->file_name, k++);
261                         fi = hdfsGetPathInfo(hd->fs, fname);
262                         fi_size = fi ? fi->mSize : 0;
263                         // fill exist and is big enough, nothing to do
264                         if( fi && fi_size >= options->chunck_size) {
265                                 continue;
266                         }
267                         fp = hdfsOpenFile(hd->fs, fname, O_WRONLY, 0, 0,
268                                           options->chunck_size);
269                         if(fp == NULL) {
270                                 failure = errno;
271                                 log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
272                                 break;
273                         }
274                         bytes_left = options->chunck_size;
275                         memset(buffer, 0, CHUNCK_CREATION_BUFFER_SIZE);
276                         while( bytes_left > CHUNCK_CREATION_BUFFER_SIZE) {
277                                 if( hdfsWrite(hd->fs, fp, buffer, CHUNCK_CREATION_BUFFER_SIZE)
278                                     != CHUNCK_CREATION_BUFFER_SIZE) {
279                                         failure = errno;
280                                         log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
281                                         break;
282                                 };
283                                 bytes_left -= CHUNCK_CREATION_BUFFER_SIZE;
284                         }
285                         if(bytes_left > 0) {
286                                 if( hdfsWrite(hd->fs, fp, buffer, bytes_left)
287                                     != bytes_left) {
288                                         failure = errno;
289                                         break;
290                                 };
291                         }
292                         if( hdfsCloseFile(hd->fs, fp) != 0) {
293                                 failure = errno;
294                                 log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
295                                 break;
296                         }
297                 }
298                 if(failure) {
299                         break;
300                 }
301         }
302         
303         if( !failure ) {
304                 fio_file_set_size_known(f);
305         }
306
307         return failure;
308 }
309
310 static int fio_hdfsio_setup(struct thread_data *td)
311 {
312         struct hdfsio_data *hd;
313         struct fio_file *f;
314         int i;
315         uint64_t file_size, total_file_size;
316
317         if (!td->io_ops_data) {
318                 hd = malloc(sizeof(*hd));
319                 memset(hd, 0, sizeof(*hd));
320                 
321                 hd->curr_file_id = -1;
322
323                 td->io_ops_data = hd;
324         }
325         
326         total_file_size = 0;
327         file_size = 0;
328
329         for_each_file(td, f, i) {
330                 if(!td->o.file_size_low) {
331                         file_size = floor(td->o.size / td->o.nr_files);
332                         total_file_size += file_size;
333                 }
334                 else if (td->o.file_size_low == td->o.file_size_high)
335                         file_size = td->o.file_size_low;
336                 else {
337                         file_size = get_rand_file_size(td);
338                 }
339                 f->real_file_size = file_size;
340         }
341         /* If the size doesn't divide nicely with the chunck size,
342          * make the last files bigger.
343          * Used only if filesize was not explicitely given
344          */
345         if (!td->o.file_size_low && total_file_size < td->o.size) {
346                 f->real_file_size += (td->o.size - total_file_size);
347         }
348
349         return 0;
350 }
351
352 static int fio_hdfsio_io_u_init(struct thread_data *td, struct io_u *io_u)
353 {
354         struct hdfsio_data *hd = td->io_ops_data;
355         struct hdfsio_options *options = td->eo;
356         int failure;
357         struct hdfsBuilder *bld;
358
359         if (options->host == NULL || options->port == 0) {
360                 log_err("hdfs: server not defined\n");
361                 return EINVAL;
362         }
363         
364         bld = hdfsNewBuilder();
365         if (!bld) {
366                 failure = errno;
367                 log_err("hdfs: unable to allocate connect builder\n");
368                 return failure;
369         }
370         hdfsBuilderSetNameNode(bld, options->host);
371         hdfsBuilderSetNameNodePort(bld, options->port);
372         if(! options->single_instance) {
373                 hdfsBuilderSetForceNewInstance(bld);
374         }
375         hd->fs = hdfsBuilderConnect(bld);
376         
377         /* hdfsSetWorkingDirectory succeed on non existend directory */
378         if (hdfsExists(hd->fs, options->directory) < 0 || hdfsSetWorkingDirectory(hd->fs, options->directory) < 0) {
379                 failure = errno;
380                 log_err("hdfs: invalid working directory %s: %s\n", options->directory, strerror(errno));
381                 return failure;
382         }
383         
384         return 0;
385 }
386
387 static void fio_hdfsio_io_u_free(struct thread_data *td, struct io_u *io_u)
388 {
389         struct hdfsio_data *hd = td->io_ops_data;
390
391         if (hd->fs && hdfsDisconnect(hd->fs) < 0) {
392                 log_err("hdfs: disconnect failed: %d\n", errno);
393         }
394 }
395
396 static struct ioengine_ops ioengine_hdfs = {
397         .name = "libhdfs",
398         .version = FIO_IOOPS_VERSION,
399         .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_NODISKUTIL,
400         .setup = fio_hdfsio_setup,
401         .init = fio_hdfsio_init,
402         .prep = fio_hdfsio_prep,
403         .queue = fio_hdfsio_queue,
404         .open_file = fio_hdfsio_open_file,
405         .close_file = fio_hdfsio_close_file,
406         .io_u_init = fio_hdfsio_io_u_init,
407         .io_u_free = fio_hdfsio_io_u_free,
408         .option_struct_size     = sizeof(struct hdfsio_options),
409         .options                = options,
410 };
411
412
413 static void fio_init fio_hdfsio_register(void)
414 {
415         register_ioengine(&ioengine_hdfs);
416 }
417
418 static void fio_exit fio_hdfsio_unregister(void)
419 {
420         unregister_ioengine(&ioengine_hdfs);
421 }