fio: add libpmem engine
[fio.git] / engines / libhdfs.c
CommitLineData
1b10477b
MM
1/*
2 * libhdfs engine
3 *
4 * this engine helps perform read/write operations on hdfs cluster using
5 * libhdfs. hdfs doesnot support modification of data once file is created.
6 *
7 * so to mimic that create many files of small size (e.g 256k), and this
8 * engine select a file based on the offset generated by fio.
9 *
10 * thus, random reads and writes can also be achieved with this logic.
11 *
1b10477b
MM
12 */
13
a3f001f5
FB
14#include <math.h>
15#include <hdfs.h>
1b10477b
MM
16
17#include "../fio.h"
d220c761 18#include "../optgroup.h"
a3f001f5
FB
19
20#define CHUNCK_NAME_LENGTH_MAX 80
21#define CHUNCK_CREATION_BUFFER_SIZE 65536
1b10477b
MM
22
23struct hdfsio_data {
1b10477b
MM
24 hdfsFS fs;
25 hdfsFile fp;
a3f001f5 26 uint64_t curr_file_id;
1b10477b
MM
27};
28
a3f001f5
FB
29struct hdfsio_options {
30 void *pad; /* needed because offset can't be 0 for a option defined used offsetof */
31 char *host;
32 char *directory;
33 unsigned int port;
34 unsigned int chunck_size;
35 unsigned int single_instance;
36 unsigned int use_direct;
37};
1b10477b 38
a3f001f5
FB
39static struct fio_option options[] = {
40 {
41 .name = "namenode",
42 .lname = "hfds namenode",
43 .type = FIO_OPT_STR_STORE,
44 .off1 = offsetof(struct hdfsio_options, host),
45 .def = "localhost",
46 .help = "Namenode of the HDFS cluster",
47 .category = FIO_OPT_C_ENGINE,
48 .group = FIO_OPT_G_HDFS,
49 },
50 {
51 .name = "hostname",
52 .lname = "hfds namenode",
53 .type = FIO_OPT_STR_STORE,
54 .off1 = offsetof(struct hdfsio_options, host),
55 .def = "localhost",
56 .help = "Namenode of the HDFS cluster",
57 .category = FIO_OPT_C_ENGINE,
58 .group = FIO_OPT_G_HDFS,
59 },
60 {
61 .name = "port",
62 .lname = "hdfs namenode port",
63 .type = FIO_OPT_INT,
64 .off1 = offsetof(struct hdfsio_options, port),
65 .def = "9000",
66 .minval = 1,
67 .maxval = 65535,
68 .help = "Port used by the HDFS cluster namenode",
69 .category = FIO_OPT_C_ENGINE,
70 .group = FIO_OPT_G_HDFS,
71 },
72 {
73 .name = "hdfsdirectory",
74 .lname = "hfds directory",
75 .type = FIO_OPT_STR_STORE,
76 .off1 = offsetof(struct hdfsio_options, directory),
77 .def = "/",
78 .help = "The HDFS directory where fio will create chuncks",
79 .category = FIO_OPT_C_ENGINE,
80 .group = FIO_OPT_G_HDFS,
81 },
82 {
dda13f44
JA
83 .name = "chunk_size",
84 .alias = "chunck_size",
85 .lname = "Chunk size",
a3f001f5
FB
86 .type = FIO_OPT_INT,
87 .off1 = offsetof(struct hdfsio_options, chunck_size),
88 .def = "1048576",
89 .help = "Size of individual chunck",
90 .category = FIO_OPT_C_ENGINE,
91 .group = FIO_OPT_G_HDFS,
92 },
93 {
94 .name = "single_instance",
dda13f44 95 .lname = "Single Instance",
a3f001f5
FB
96 .type = FIO_OPT_BOOL,
97 .off1 = offsetof(struct hdfsio_options, single_instance),
98 .def = "1",
99 .help = "Use a single instance",
100 .category = FIO_OPT_C_ENGINE,
101 .group = FIO_OPT_G_HDFS,
102 },
103 {
104 .name = "hdfs_use_direct",
dda13f44 105 .lname = "HDFS Use Direct",
a3f001f5
FB
106 .type = FIO_OPT_BOOL,
107 .off1 = offsetof(struct hdfsio_options, use_direct),
108 .def = "0",
109 .help = "Use readDirect instead of hdfsRead",
110 .category = FIO_OPT_C_ENGINE,
111 .group = FIO_OPT_G_HDFS,
112 },
113 {
114 .name = NULL,
115 },
116};
117
118
119static int get_chunck_name(char *dest, char *file_name, uint64_t chunk_id) {
120 return snprintf(dest, CHUNCK_NAME_LENGTH_MAX, "%s_%lu", file_name, chunk_id);
1b10477b
MM
121}
122
123static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u)
124{
a3f001f5 125 struct hdfsio_options *options = td->eo;
565e784d 126 struct hdfsio_data *hd = td->io_ops_data;
1b10477b 127 unsigned long f_id;
a3f001f5
FB
128 char fname[CHUNCK_NAME_LENGTH_MAX];
129 int open_flags;
1b10477b
MM
130
131 /* find out file id based on the offset generated by fio */
a3f001f5 132 f_id = floor(io_u->offset / options-> chunck_size);
1b10477b
MM
133
134 if (f_id == hd->curr_file_id) {
135 /* file is already open */
136 return 0;
137 }
138
139 if (hd->curr_file_id != -1) {
a3f001f5
FB
140 if ( hdfsCloseFile(hd->fs, hd->fp) == -1) {
141 log_err("hdfs: unable to close file: %s\n", strerror(errno));
142 return errno;
143 }
144 hd->curr_file_id = -1;
1b10477b
MM
145 }
146
a3f001f5 147 if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_SYNC) {
1b10477b
MM
148 open_flags = O_RDONLY;
149 } else if (io_u->ddir == DDIR_WRITE) {
150 open_flags = O_WRONLY;
151 } else {
7d4a8e7e 152 log_err("hdfs: Invalid I/O Operation\n");
a3f001f5
FB
153 return 0;
154 }
155
156 get_chunck_name(fname, io_u->file->file_name, f_id);
157 hd->fp = hdfsOpenFile(hd->fs, fname, open_flags, 0, 0,
158 options->chunck_size);
159 if(hd->fp == NULL) {
160 log_err("hdfs: unable to open file: %s: %d\n", fname, strerror(errno));
161 return errno;
1b10477b 162 }
1b10477b 163 hd->curr_file_id = f_id;
1b10477b
MM
164
165 return 0;
166}
167
a3f001f5 168static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u)
1b10477b 169{
565e784d 170 struct hdfsio_data *hd = td->io_ops_data;
a3f001f5
FB
171 struct hdfsio_options *options = td->eo;
172 int ret;
173 unsigned long offset;
174
175 offset = io_u->offset % options->chunck_size;
176
177 if( (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) &&
178 hdfsTell(hd->fs, hd->fp) != offset && hdfsSeek(hd->fs, hd->fp, offset) != 0 ) {
179 log_err("hdfs: seek failed: %s, are you doing random write smaller than chunck size ?\n", strerror(errno));
180 io_u->error = errno;
181 return FIO_Q_COMPLETED;
182 };
183
184 // do the IO
185 if (io_u->ddir == DDIR_READ) {
186 if (options->use_direct) {
187 ret = readDirect(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
188 } else {
189 ret = hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
190 }
191 } else if (io_u->ddir == DDIR_WRITE) {
192 ret = hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf,
193 io_u->xfer_buflen);
194 } else if (io_u->ddir == DDIR_SYNC) {
195 ret = hdfsFlush(hd->fs, hd->fp);
196 } else {
197 log_err("hdfs: Invalid I/O Operation: %d\n", io_u->ddir);
198 ret = EINVAL;
199 }
200
201 // Check if the IO went fine, or is incomplete
1b10477b
MM
202 if (ret != (int)io_u->xfer_buflen) {
203 if (ret >= 0) {
204 io_u->resid = io_u->xfer_buflen - ret;
205 io_u->error = 0;
206 return FIO_Q_COMPLETED;
a3f001f5 207 } else {
1b10477b 208 io_u->error = errno;
a3f001f5 209 }
1b10477b
MM
210 }
211
212 if (io_u->error)
213 td_verror(td, io_u->error, "xfer");
214
215 return FIO_Q_COMPLETED;
216}
217
a3f001f5 218int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f)
1b10477b 219{
a3f001f5
FB
220 if (td->o.odirect) {
221 td->error = EINVAL;
222 return 0;
1b10477b
MM
223 }
224
a3f001f5 225 return 0;
1b10477b
MM
226}
227
a3f001f5 228int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f)
1b10477b 229{
565e784d 230 struct hdfsio_data *hd = td->io_ops_data;
1b10477b 231
a3f001f5
FB
232 if (hd->curr_file_id != -1) {
233 if ( hdfsCloseFile(hd->fs, hd->fp) == -1) {
234 log_err("hdfs: unable to close file: %s\n", strerror(errno));
235 return errno;
236 }
237 hd->curr_file_id = -1;
238 }
1b10477b
MM
239 return 0;
240}
241
a3f001f5 242static int fio_hdfsio_init(struct thread_data *td)
1b10477b 243{
a3f001f5 244 struct hdfsio_options *options = td->eo;
565e784d 245 struct hdfsio_data *hd = td->io_ops_data;
a3f001f5
FB
246 struct fio_file *f;
247 uint64_t j,k;
248 int i, failure = 0;
249 uint8_t buffer[CHUNCK_CREATION_BUFFER_SIZE];
250 uint64_t bytes_left;
251 char fname[CHUNCK_NAME_LENGTH_MAX];
252 hdfsFile fp;
253 hdfsFileInfo *fi;
254 tOffset fi_size;
255
256 for_each_file(td, f, i) {
257 k = 0;
258 for(j=0; j < f->real_file_size; j += options->chunck_size) {
259 get_chunck_name(fname, f->file_name, k++);
260 fi = hdfsGetPathInfo(hd->fs, fname);
261 fi_size = fi ? fi->mSize : 0;
262 // fill exist and is big enough, nothing to do
263 if( fi && fi_size >= options->chunck_size) {
264 continue;
265 }
266 fp = hdfsOpenFile(hd->fs, fname, O_WRONLY, 0, 0,
267 options->chunck_size);
268 if(fp == NULL) {
269 failure = errno;
270 log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
271 break;
272 }
273 bytes_left = options->chunck_size;
274 memset(buffer, 0, CHUNCK_CREATION_BUFFER_SIZE);
275 while( bytes_left > CHUNCK_CREATION_BUFFER_SIZE) {
276 if( hdfsWrite(hd->fs, fp, buffer, CHUNCK_CREATION_BUFFER_SIZE)
277 != CHUNCK_CREATION_BUFFER_SIZE) {
278 failure = errno;
279 log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
280 break;
281 };
282 bytes_left -= CHUNCK_CREATION_BUFFER_SIZE;
283 }
284 if(bytes_left > 0) {
285 if( hdfsWrite(hd->fs, fp, buffer, bytes_left)
286 != bytes_left) {
287 failure = errno;
288 break;
289 };
290 }
291 if( hdfsCloseFile(hd->fs, fp) != 0) {
292 failure = errno;
293 log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
294 break;
295 }
296 }
297 if(failure) {
298 break;
299 }
300 }
301
302 if( !failure ) {
303 fio_file_set_size_known(f);
304 }
1b10477b 305
a3f001f5 306 return failure;
1b10477b
MM
307}
308
309static int fio_hdfsio_setup(struct thread_data *td)
310{
311 struct hdfsio_data *hd;
312 struct fio_file *f;
a3f001f5
FB
313 int i;
314 uint64_t file_size, total_file_size;
1b10477b 315
565e784d 316 if (!td->io_ops_data) {
a3f001f5 317 hd = malloc(sizeof(*hd));
1b10477b 318 memset(hd, 0, sizeof(*hd));
a3f001f5
FB
319
320 hd->curr_file_id = -1;
1b10477b 321
565e784d 322 td->io_ops_data = hd;
a3f001f5
FB
323 }
324
325 total_file_size = 0;
326 file_size = 0;
327
328 for_each_file(td, f, i) {
329 if(!td->o.file_size_low) {
330 file_size = floor(td->o.size / td->o.nr_files);
331 total_file_size += file_size;
1b10477b 332 }
a3f001f5
FB
333 else if (td->o.file_size_low == td->o.file_size_high)
334 file_size = td->o.file_size_low;
335 else {
336 file_size = get_rand_file_size(td);
337 }
338 f->real_file_size = file_size;
1b10477b 339 }
a3f001f5
FB
340 /* If the size doesn't divide nicely with the chunck size,
341 * make the last files bigger.
342 * Used only if filesize was not explicitely given
343 */
344 if (!td->o.file_size_low && total_file_size < td->o.size) {
345 f->real_file_size += (td->o.size - total_file_size);
346 }
347
348 return 0;
349}
1b10477b 350
a3f001f5
FB
351static int fio_hdfsio_io_u_init(struct thread_data *td, struct io_u *io_u)
352{
565e784d 353 struct hdfsio_data *hd = td->io_ops_data;
a3f001f5
FB
354 struct hdfsio_options *options = td->eo;
355 int failure;
356 struct hdfsBuilder *bld;
357
358 if (options->host == NULL || options->port == 0) {
359 log_err("hdfs: server not defined\n");
360 return EINVAL;
361 }
362
363 bld = hdfsNewBuilder();
364 if (!bld) {
365 failure = errno;
366 log_err("hdfs: unable to allocate connect builder\n");
367 return failure;
368 }
369 hdfsBuilderSetNameNode(bld, options->host);
370 hdfsBuilderSetNameNodePort(bld, options->port);
371 if(! options->single_instance) {
372 hdfsBuilderSetForceNewInstance(bld);
373 }
374 hd->fs = hdfsBuilderConnect(bld);
375
376 /* hdfsSetWorkingDirectory succeed on non existend directory */
377 if (hdfsExists(hd->fs, options->directory) < 0 || hdfsSetWorkingDirectory(hd->fs, options->directory) < 0) {
378 failure = errno;
379 log_err("hdfs: invalid working directory %s: %s\n", options->directory, strerror(errno));
380 return failure;
381 }
382
1b10477b
MM
383 return 0;
384}
385
a3f001f5
FB
386static void fio_hdfsio_io_u_free(struct thread_data *td, struct io_u *io_u)
387{
565e784d 388 struct hdfsio_data *hd = td->io_ops_data;
a3f001f5
FB
389
390 if (hd->fs && hdfsDisconnect(hd->fs) < 0) {
391 log_err("hdfs: disconnect failed: %d\n", errno);
392 }
393}
394
1b10477b
MM
395static struct ioengine_ops ioengine_hdfs = {
396 .name = "libhdfs",
397 .version = FIO_IOOPS_VERSION,
a3f001f5 398 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_NODISKUTIL,
1b10477b 399 .setup = fio_hdfsio_setup,
a3f001f5 400 .init = fio_hdfsio_init,
1b10477b
MM
401 .prep = fio_hdfsio_prep,
402 .queue = fio_hdfsio_queue,
403 .open_file = fio_hdfsio_open_file,
404 .close_file = fio_hdfsio_close_file,
a3f001f5
FB
405 .io_u_init = fio_hdfsio_io_u_init,
406 .io_u_free = fio_hdfsio_io_u_free,
407 .option_struct_size = sizeof(struct hdfsio_options),
408 .options = options,
1b10477b
MM
409};
410
a3f001f5 411
1b10477b
MM
412static void fio_init fio_hdfsio_register(void)
413{
414 register_ioengine(&ioengine_hdfs);
415}
416
417static void fio_exit fio_hdfsio_unregister(void)
418{
419 unregister_ioengine(&ioengine_hdfs);
420}