options: make the groups/categories constant
[fio.git] / engines / libhdfs.c
... / ...
CommitLineData
1/*
2 * libhdfs engine
3 *
4 * this engine helps perform read/write operations on hdfs cluster using
5 * libhdfs. hdfs doesnot support modification of data once file is created.
6 *
7 * so to mimic that create many files of small size (e.g 256k), and this
8 * engine select a file based on the offset generated by fio.
9 *
10 * thus, random reads and writes can also be achieved with this logic.
11 *
12 */
13
14#include <math.h>
15#include <hdfs.h>
16
17#include "../fio.h"
18
19
20#define CHUNCK_NAME_LENGTH_MAX 80
21#define CHUNCK_CREATION_BUFFER_SIZE 65536
22
23struct hdfsio_data {
24 hdfsFS fs;
25 hdfsFile fp;
26 uint64_t curr_file_id;
27};
28
29struct hdfsio_options {
30 void *pad; /* needed because offset can't be 0 for a option defined used offsetof */
31 char *host;
32 char *directory;
33 unsigned int port;
34 unsigned int chunck_size;
35 unsigned int single_instance;
36 unsigned int use_direct;
37};
38
39static struct fio_option options[] = {
40 {
41 .name = "namenode",
42 .lname = "hfds namenode",
43 .type = FIO_OPT_STR_STORE,
44 .off1 = offsetof(struct hdfsio_options, host),
45 .def = "localhost",
46 .help = "Namenode of the HDFS cluster",
47 .category = FIO_OPT_C_ENGINE,
48 .group = FIO_OPT_G_HDFS,
49 },
50 {
51 .name = "hostname",
52 .lname = "hfds namenode",
53 .type = FIO_OPT_STR_STORE,
54 .off1 = offsetof(struct hdfsio_options, host),
55 .def = "localhost",
56 .help = "Namenode of the HDFS cluster",
57 .category = FIO_OPT_C_ENGINE,
58 .group = FIO_OPT_G_HDFS,
59 },
60 {
61 .name = "port",
62 .lname = "hdfs namenode port",
63 .type = FIO_OPT_INT,
64 .off1 = offsetof(struct hdfsio_options, port),
65 .def = "9000",
66 .minval = 1,
67 .maxval = 65535,
68 .help = "Port used by the HDFS cluster namenode",
69 .category = FIO_OPT_C_ENGINE,
70 .group = FIO_OPT_G_HDFS,
71 },
72 {
73 .name = "hdfsdirectory",
74 .lname = "hfds directory",
75 .type = FIO_OPT_STR_STORE,
76 .off1 = offsetof(struct hdfsio_options, directory),
77 .def = "/",
78 .help = "The HDFS directory where fio will create chuncks",
79 .category = FIO_OPT_C_ENGINE,
80 .group = FIO_OPT_G_HDFS,
81 },
82 {
83 .name = "chunck_size",
84 .type = FIO_OPT_INT,
85 .off1 = offsetof(struct hdfsio_options, chunck_size),
86 .def = "1048576",
87 .help = "Size of individual chunck",
88 .category = FIO_OPT_C_ENGINE,
89 .group = FIO_OPT_G_HDFS,
90 },
91 {
92 .name = "single_instance",
93 .type = FIO_OPT_BOOL,
94 .off1 = offsetof(struct hdfsio_options, single_instance),
95 .def = "1",
96 .help = "Use a single instance",
97 .category = FIO_OPT_C_ENGINE,
98 .group = FIO_OPT_G_HDFS,
99 },
100 {
101 .name = "hdfs_use_direct",
102 .type = FIO_OPT_BOOL,
103 .off1 = offsetof(struct hdfsio_options, use_direct),
104 .def = "0",
105 .help = "Use readDirect instead of hdfsRead",
106 .category = FIO_OPT_C_ENGINE,
107 .group = FIO_OPT_G_HDFS,
108 },
109 {
110 .name = NULL,
111 },
112};
113
114
115static int get_chunck_name(char *dest, char *file_name, uint64_t chunk_id) {
116 return snprintf(dest, CHUNCK_NAME_LENGTH_MAX, "%s_%lu", file_name, chunk_id);
117}
118
119static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u)
120{
121 struct hdfsio_options *options = td->eo;
122 struct hdfsio_data *hd = td->io_ops->data;
123 unsigned long f_id;
124 char fname[CHUNCK_NAME_LENGTH_MAX];
125 int open_flags;
126
127 /* find out file id based on the offset generated by fio */
128 f_id = floor(io_u->offset / options-> chunck_size);
129
130 if (f_id == hd->curr_file_id) {
131 /* file is already open */
132 return 0;
133 }
134
135 if (hd->curr_file_id != -1) {
136 if ( hdfsCloseFile(hd->fs, hd->fp) == -1) {
137 log_err("hdfs: unable to close file: %s\n", strerror(errno));
138 return errno;
139 }
140 hd->curr_file_id = -1;
141 }
142
143 if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_SYNC) {
144 open_flags = O_RDONLY;
145 } else if (io_u->ddir == DDIR_WRITE) {
146 open_flags = O_WRONLY;
147 } else {
148 log_err("hdfs: Invalid I/O Operation\n");
149 return 0;
150 }
151
152 get_chunck_name(fname, io_u->file->file_name, f_id);
153 hd->fp = hdfsOpenFile(hd->fs, fname, open_flags, 0, 0,
154 options->chunck_size);
155 if(hd->fp == NULL) {
156 log_err("hdfs: unable to open file: %s: %d\n", fname, strerror(errno));
157 return errno;
158 }
159 hd->curr_file_id = f_id;
160
161 return 0;
162}
163
164static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u)
165{
166 struct hdfsio_data *hd = td->io_ops->data;
167 struct hdfsio_options *options = td->eo;
168 int ret;
169 unsigned long offset;
170
171 offset = io_u->offset % options->chunck_size;
172
173 if( (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) &&
174 hdfsTell(hd->fs, hd->fp) != offset && hdfsSeek(hd->fs, hd->fp, offset) != 0 ) {
175 log_err("hdfs: seek failed: %s, are you doing random write smaller than chunck size ?\n", strerror(errno));
176 io_u->error = errno;
177 return FIO_Q_COMPLETED;
178 };
179
180 // do the IO
181 if (io_u->ddir == DDIR_READ) {
182 if (options->use_direct) {
183 ret = readDirect(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
184 } else {
185 ret = hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
186 }
187 } else if (io_u->ddir == DDIR_WRITE) {
188 ret = hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf,
189 io_u->xfer_buflen);
190 } else if (io_u->ddir == DDIR_SYNC) {
191 ret = hdfsFlush(hd->fs, hd->fp);
192 } else {
193 log_err("hdfs: Invalid I/O Operation: %d\n", io_u->ddir);
194 ret = EINVAL;
195 }
196
197 // Check if the IO went fine, or is incomplete
198 if (ret != (int)io_u->xfer_buflen) {
199 if (ret >= 0) {
200 io_u->resid = io_u->xfer_buflen - ret;
201 io_u->error = 0;
202 return FIO_Q_COMPLETED;
203 } else {
204 io_u->error = errno;
205 }
206 }
207
208 if (io_u->error)
209 td_verror(td, io_u->error, "xfer");
210
211 return FIO_Q_COMPLETED;
212}
213
214int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f)
215{
216 if (td->o.odirect) {
217 td->error = EINVAL;
218 return 0;
219 }
220
221 return 0;
222}
223
224int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f)
225{
226 struct hdfsio_data *hd = td->io_ops->data;
227
228 if (hd->curr_file_id != -1) {
229 if ( hdfsCloseFile(hd->fs, hd->fp) == -1) {
230 log_err("hdfs: unable to close file: %s\n", strerror(errno));
231 return errno;
232 }
233 hd->curr_file_id = -1;
234 }
235 return 0;
236}
237
238static int fio_hdfsio_init(struct thread_data *td)
239{
240 struct hdfsio_options *options = td->eo;
241 struct hdfsio_data *hd = td->io_ops->data;
242 struct fio_file *f;
243 uint64_t j,k;
244 int i, failure = 0;
245 uint8_t buffer[CHUNCK_CREATION_BUFFER_SIZE];
246 uint64_t bytes_left;
247 char fname[CHUNCK_NAME_LENGTH_MAX];
248 hdfsFile fp;
249 hdfsFileInfo *fi;
250 tOffset fi_size;
251
252 for_each_file(td, f, i) {
253 k = 0;
254 for(j=0; j < f->real_file_size; j += options->chunck_size) {
255 get_chunck_name(fname, f->file_name, k++);
256 fi = hdfsGetPathInfo(hd->fs, fname);
257 fi_size = fi ? fi->mSize : 0;
258 // fill exist and is big enough, nothing to do
259 if( fi && fi_size >= options->chunck_size) {
260 continue;
261 }
262 fp = hdfsOpenFile(hd->fs, fname, O_WRONLY, 0, 0,
263 options->chunck_size);
264 if(fp == NULL) {
265 failure = errno;
266 log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
267 break;
268 }
269 bytes_left = options->chunck_size;
270 memset(buffer, 0, CHUNCK_CREATION_BUFFER_SIZE);
271 while( bytes_left > CHUNCK_CREATION_BUFFER_SIZE) {
272 if( hdfsWrite(hd->fs, fp, buffer, CHUNCK_CREATION_BUFFER_SIZE)
273 != CHUNCK_CREATION_BUFFER_SIZE) {
274 failure = errno;
275 log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
276 break;
277 };
278 bytes_left -= CHUNCK_CREATION_BUFFER_SIZE;
279 }
280 if(bytes_left > 0) {
281 if( hdfsWrite(hd->fs, fp, buffer, bytes_left)
282 != bytes_left) {
283 failure = errno;
284 break;
285 };
286 }
287 if( hdfsCloseFile(hd->fs, fp) != 0) {
288 failure = errno;
289 log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno));
290 break;
291 }
292 }
293 if(failure) {
294 break;
295 }
296 }
297
298 if( !failure ) {
299 fio_file_set_size_known(f);
300 }
301
302 return failure;
303}
304
305static int fio_hdfsio_setup(struct thread_data *td)
306{
307 struct hdfsio_data *hd;
308 struct fio_file *f;
309 int i;
310 uint64_t file_size, total_file_size;
311
312 if (!td->io_ops->data) {
313 hd = malloc(sizeof(*hd));
314 memset(hd, 0, sizeof(*hd));
315
316 hd->curr_file_id = -1;
317
318 td->io_ops->data = hd;
319 }
320
321 total_file_size = 0;
322 file_size = 0;
323
324 for_each_file(td, f, i) {
325 if(!td->o.file_size_low) {
326 file_size = floor(td->o.size / td->o.nr_files);
327 total_file_size += file_size;
328 }
329 else if (td->o.file_size_low == td->o.file_size_high)
330 file_size = td->o.file_size_low;
331 else {
332 file_size = get_rand_file_size(td);
333 }
334 f->real_file_size = file_size;
335 }
336 /* If the size doesn't divide nicely with the chunck size,
337 * make the last files bigger.
338 * Used only if filesize was not explicitely given
339 */
340 if (!td->o.file_size_low && total_file_size < td->o.size) {
341 f->real_file_size += (td->o.size - total_file_size);
342 }
343
344 return 0;
345}
346
347static int fio_hdfsio_io_u_init(struct thread_data *td, struct io_u *io_u)
348{
349 struct hdfsio_data *hd = td->io_ops->data;
350 struct hdfsio_options *options = td->eo;
351 int failure;
352 struct hdfsBuilder *bld;
353
354 if (options->host == NULL || options->port == 0) {
355 log_err("hdfs: server not defined\n");
356 return EINVAL;
357 }
358
359 bld = hdfsNewBuilder();
360 if (!bld) {
361 failure = errno;
362 log_err("hdfs: unable to allocate connect builder\n");
363 return failure;
364 }
365 hdfsBuilderSetNameNode(bld, options->host);
366 hdfsBuilderSetNameNodePort(bld, options->port);
367 if(! options->single_instance) {
368 hdfsBuilderSetForceNewInstance(bld);
369 }
370 hd->fs = hdfsBuilderConnect(bld);
371
372 /* hdfsSetWorkingDirectory succeed on non existend directory */
373 if (hdfsExists(hd->fs, options->directory) < 0 || hdfsSetWorkingDirectory(hd->fs, options->directory) < 0) {
374 failure = errno;
375 log_err("hdfs: invalid working directory %s: %s\n", options->directory, strerror(errno));
376 return failure;
377 }
378
379 return 0;
380}
381
382static void fio_hdfsio_io_u_free(struct thread_data *td, struct io_u *io_u)
383{
384 struct hdfsio_data *hd = td->io_ops->data;
385
386 if (hd->fs && hdfsDisconnect(hd->fs) < 0) {
387 log_err("hdfs: disconnect failed: %d\n", errno);
388 }
389}
390
391static struct ioengine_ops ioengine_hdfs = {
392 .name = "libhdfs",
393 .version = FIO_IOOPS_VERSION,
394 .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_NODISKUTIL,
395 .setup = fio_hdfsio_setup,
396 .init = fio_hdfsio_init,
397 .prep = fio_hdfsio_prep,
398 .queue = fio_hdfsio_queue,
399 .open_file = fio_hdfsio_open_file,
400 .close_file = fio_hdfsio_close_file,
401 .io_u_init = fio_hdfsio_io_u_init,
402 .io_u_free = fio_hdfsio_io_u_free,
403 .option_struct_size = sizeof(struct hdfsio_options),
404 .options = options,
405};
406
407
408static void fio_init fio_hdfsio_register(void)
409{
410 register_ioengine(&ioengine_hdfs);
411}
412
413static void fio_exit fio_hdfsio_unregister(void)
414{
415 unregister_ioengine(&ioengine_hdfs);
416}