Commit | Line | Data |
---|---|---|
1b10477b MM |
1 | /* |
2 | * libhdfs engine | |
3 | * | |
4 | * this engine helps perform read/write operations on hdfs cluster using | |
5 | * libhdfs. hdfs doesnot support modification of data once file is created. | |
6 | * | |
7 | * so to mimic that create many files of small size (e.g 256k), and this | |
8 | * engine select a file based on the offset generated by fio. | |
9 | * | |
10 | * thus, random reads and writes can also be achieved with this logic. | |
11 | * | |
1b10477b MM |
12 | */ |
13 | ||
a3f001f5 FB |
14 | #include <math.h> |
15 | #include <hdfs.h> | |
1b10477b MM |
16 | |
17 | #include "../fio.h" | |
d220c761 | 18 | #include "../optgroup.h" |
a3f001f5 FB |
19 | |
20 | #define CHUNCK_NAME_LENGTH_MAX 80 | |
21 | #define CHUNCK_CREATION_BUFFER_SIZE 65536 | |
1b10477b MM |
22 | |
23 | struct hdfsio_data { | |
1b10477b MM |
24 | hdfsFS fs; |
25 | hdfsFile fp; | |
a3f001f5 | 26 | uint64_t curr_file_id; |
1b10477b MM |
27 | }; |
28 | ||
a3f001f5 FB |
29 | struct hdfsio_options { |
30 | void *pad; /* needed because offset can't be 0 for a option defined used offsetof */ | |
31 | char *host; | |
32 | char *directory; | |
33 | unsigned int port; | |
34 | unsigned int chunck_size; | |
35 | unsigned int single_instance; | |
36 | unsigned int use_direct; | |
37 | }; | |
1b10477b | 38 | |
a3f001f5 FB |
39 | static struct fio_option options[] = { |
40 | { | |
41 | .name = "namenode", | |
42 | .lname = "hfds namenode", | |
43 | .type = FIO_OPT_STR_STORE, | |
44 | .off1 = offsetof(struct hdfsio_options, host), | |
45 | .def = "localhost", | |
46 | .help = "Namenode of the HDFS cluster", | |
47 | .category = FIO_OPT_C_ENGINE, | |
48 | .group = FIO_OPT_G_HDFS, | |
49 | }, | |
50 | { | |
51 | .name = "hostname", | |
52 | .lname = "hfds namenode", | |
53 | .type = FIO_OPT_STR_STORE, | |
54 | .off1 = offsetof(struct hdfsio_options, host), | |
55 | .def = "localhost", | |
56 | .help = "Namenode of the HDFS cluster", | |
57 | .category = FIO_OPT_C_ENGINE, | |
58 | .group = FIO_OPT_G_HDFS, | |
59 | }, | |
60 | { | |
61 | .name = "port", | |
62 | .lname = "hdfs namenode port", | |
63 | .type = FIO_OPT_INT, | |
64 | .off1 = offsetof(struct hdfsio_options, port), | |
65 | .def = "9000", | |
66 | .minval = 1, | |
67 | .maxval = 65535, | |
68 | .help = "Port used by the HDFS cluster namenode", | |
69 | .category = FIO_OPT_C_ENGINE, | |
70 | .group = FIO_OPT_G_HDFS, | |
71 | }, | |
72 | { | |
73 | .name = "hdfsdirectory", | |
74 | .lname = "hfds directory", | |
75 | .type = FIO_OPT_STR_STORE, | |
76 | .off1 = offsetof(struct hdfsio_options, directory), | |
77 | .def = "/", | |
78 | .help = "The HDFS directory where fio will create chuncks", | |
79 | .category = FIO_OPT_C_ENGINE, | |
80 | .group = FIO_OPT_G_HDFS, | |
81 | }, | |
82 | { | |
83 | .name = "chunck_size", | |
84 | .type = FIO_OPT_INT, | |
85 | .off1 = offsetof(struct hdfsio_options, chunck_size), | |
86 | .def = "1048576", | |
87 | .help = "Size of individual chunck", | |
88 | .category = FIO_OPT_C_ENGINE, | |
89 | .group = FIO_OPT_G_HDFS, | |
90 | }, | |
91 | { | |
92 | .name = "single_instance", | |
93 | .type = FIO_OPT_BOOL, | |
94 | .off1 = offsetof(struct hdfsio_options, single_instance), | |
95 | .def = "1", | |
96 | .help = "Use a single instance", | |
97 | .category = FIO_OPT_C_ENGINE, | |
98 | .group = FIO_OPT_G_HDFS, | |
99 | }, | |
100 | { | |
101 | .name = "hdfs_use_direct", | |
102 | .type = FIO_OPT_BOOL, | |
103 | .off1 = offsetof(struct hdfsio_options, use_direct), | |
104 | .def = "0", | |
105 | .help = "Use readDirect instead of hdfsRead", | |
106 | .category = FIO_OPT_C_ENGINE, | |
107 | .group = FIO_OPT_G_HDFS, | |
108 | }, | |
109 | { | |
110 | .name = NULL, | |
111 | }, | |
112 | }; | |
113 | ||
114 | ||
115 | static int get_chunck_name(char *dest, char *file_name, uint64_t chunk_id) { | |
116 | return snprintf(dest, CHUNCK_NAME_LENGTH_MAX, "%s_%lu", file_name, chunk_id); | |
1b10477b MM |
117 | } |
118 | ||
119 | static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u) | |
120 | { | |
a3f001f5 FB |
121 | struct hdfsio_options *options = td->eo; |
122 | struct hdfsio_data *hd = td->io_ops->data; | |
1b10477b | 123 | unsigned long f_id; |
a3f001f5 FB |
124 | char fname[CHUNCK_NAME_LENGTH_MAX]; |
125 | int open_flags; | |
1b10477b MM |
126 | |
127 | /* find out file id based on the offset generated by fio */ | |
a3f001f5 | 128 | f_id = floor(io_u->offset / options-> chunck_size); |
1b10477b MM |
129 | |
130 | if (f_id == hd->curr_file_id) { | |
131 | /* file is already open */ | |
132 | return 0; | |
133 | } | |
134 | ||
135 | if (hd->curr_file_id != -1) { | |
a3f001f5 FB |
136 | if ( hdfsCloseFile(hd->fs, hd->fp) == -1) { |
137 | log_err("hdfs: unable to close file: %s\n", strerror(errno)); | |
138 | return errno; | |
139 | } | |
140 | hd->curr_file_id = -1; | |
1b10477b MM |
141 | } |
142 | ||
a3f001f5 | 143 | if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_SYNC) { |
1b10477b MM |
144 | open_flags = O_RDONLY; |
145 | } else if (io_u->ddir == DDIR_WRITE) { | |
146 | open_flags = O_WRONLY; | |
147 | } else { | |
7d4a8e7e | 148 | log_err("hdfs: Invalid I/O Operation\n"); |
a3f001f5 FB |
149 | return 0; |
150 | } | |
151 | ||
152 | get_chunck_name(fname, io_u->file->file_name, f_id); | |
153 | hd->fp = hdfsOpenFile(hd->fs, fname, open_flags, 0, 0, | |
154 | options->chunck_size); | |
155 | if(hd->fp == NULL) { | |
156 | log_err("hdfs: unable to open file: %s: %d\n", fname, strerror(errno)); | |
157 | return errno; | |
1b10477b | 158 | } |
1b10477b | 159 | hd->curr_file_id = f_id; |
1b10477b MM |
160 | |
161 | return 0; | |
162 | } | |
163 | ||
a3f001f5 | 164 | static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u) |
1b10477b | 165 | { |
a3f001f5 FB |
166 | struct hdfsio_data *hd = td->io_ops->data; |
167 | struct hdfsio_options *options = td->eo; | |
168 | int ret; | |
169 | unsigned long offset; | |
170 | ||
171 | offset = io_u->offset % options->chunck_size; | |
172 | ||
173 | if( (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) && | |
174 | hdfsTell(hd->fs, hd->fp) != offset && hdfsSeek(hd->fs, hd->fp, offset) != 0 ) { | |
175 | log_err("hdfs: seek failed: %s, are you doing random write smaller than chunck size ?\n", strerror(errno)); | |
176 | io_u->error = errno; | |
177 | return FIO_Q_COMPLETED; | |
178 | }; | |
179 | ||
180 | // do the IO | |
181 | if (io_u->ddir == DDIR_READ) { | |
182 | if (options->use_direct) { | |
183 | ret = readDirect(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen); | |
184 | } else { | |
185 | ret = hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen); | |
186 | } | |
187 | } else if (io_u->ddir == DDIR_WRITE) { | |
188 | ret = hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf, | |
189 | io_u->xfer_buflen); | |
190 | } else if (io_u->ddir == DDIR_SYNC) { | |
191 | ret = hdfsFlush(hd->fs, hd->fp); | |
192 | } else { | |
193 | log_err("hdfs: Invalid I/O Operation: %d\n", io_u->ddir); | |
194 | ret = EINVAL; | |
195 | } | |
196 | ||
197 | // Check if the IO went fine, or is incomplete | |
1b10477b MM |
198 | if (ret != (int)io_u->xfer_buflen) { |
199 | if (ret >= 0) { | |
200 | io_u->resid = io_u->xfer_buflen - ret; | |
201 | io_u->error = 0; | |
202 | return FIO_Q_COMPLETED; | |
a3f001f5 | 203 | } else { |
1b10477b | 204 | io_u->error = errno; |
a3f001f5 | 205 | } |
1b10477b MM |
206 | } |
207 | ||
208 | if (io_u->error) | |
209 | td_verror(td, io_u->error, "xfer"); | |
210 | ||
211 | return FIO_Q_COMPLETED; | |
212 | } | |
213 | ||
a3f001f5 | 214 | int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f) |
1b10477b | 215 | { |
a3f001f5 FB |
216 | if (td->o.odirect) { |
217 | td->error = EINVAL; | |
218 | return 0; | |
1b10477b MM |
219 | } |
220 | ||
a3f001f5 | 221 | return 0; |
1b10477b MM |
222 | } |
223 | ||
a3f001f5 | 224 | int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f) |
1b10477b | 225 | { |
a3f001f5 | 226 | struct hdfsio_data *hd = td->io_ops->data; |
1b10477b | 227 | |
a3f001f5 FB |
228 | if (hd->curr_file_id != -1) { |
229 | if ( hdfsCloseFile(hd->fs, hd->fp) == -1) { | |
230 | log_err("hdfs: unable to close file: %s\n", strerror(errno)); | |
231 | return errno; | |
232 | } | |
233 | hd->curr_file_id = -1; | |
234 | } | |
1b10477b MM |
235 | return 0; |
236 | } | |
237 | ||
a3f001f5 | 238 | static int fio_hdfsio_init(struct thread_data *td) |
1b10477b | 239 | { |
a3f001f5 FB |
240 | struct hdfsio_options *options = td->eo; |
241 | struct hdfsio_data *hd = td->io_ops->data; | |
242 | struct fio_file *f; | |
243 | uint64_t j,k; | |
244 | int i, failure = 0; | |
245 | uint8_t buffer[CHUNCK_CREATION_BUFFER_SIZE]; | |
246 | uint64_t bytes_left; | |
247 | char fname[CHUNCK_NAME_LENGTH_MAX]; | |
248 | hdfsFile fp; | |
249 | hdfsFileInfo *fi; | |
250 | tOffset fi_size; | |
251 | ||
252 | for_each_file(td, f, i) { | |
253 | k = 0; | |
254 | for(j=0; j < f->real_file_size; j += options->chunck_size) { | |
255 | get_chunck_name(fname, f->file_name, k++); | |
256 | fi = hdfsGetPathInfo(hd->fs, fname); | |
257 | fi_size = fi ? fi->mSize : 0; | |
258 | // fill exist and is big enough, nothing to do | |
259 | if( fi && fi_size >= options->chunck_size) { | |
260 | continue; | |
261 | } | |
262 | fp = hdfsOpenFile(hd->fs, fname, O_WRONLY, 0, 0, | |
263 | options->chunck_size); | |
264 | if(fp == NULL) { | |
265 | failure = errno; | |
266 | log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno)); | |
267 | break; | |
268 | } | |
269 | bytes_left = options->chunck_size; | |
270 | memset(buffer, 0, CHUNCK_CREATION_BUFFER_SIZE); | |
271 | while( bytes_left > CHUNCK_CREATION_BUFFER_SIZE) { | |
272 | if( hdfsWrite(hd->fs, fp, buffer, CHUNCK_CREATION_BUFFER_SIZE) | |
273 | != CHUNCK_CREATION_BUFFER_SIZE) { | |
274 | failure = errno; | |
275 | log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno)); | |
276 | break; | |
277 | }; | |
278 | bytes_left -= CHUNCK_CREATION_BUFFER_SIZE; | |
279 | } | |
280 | if(bytes_left > 0) { | |
281 | if( hdfsWrite(hd->fs, fp, buffer, bytes_left) | |
282 | != bytes_left) { | |
283 | failure = errno; | |
284 | break; | |
285 | }; | |
286 | } | |
287 | if( hdfsCloseFile(hd->fs, fp) != 0) { | |
288 | failure = errno; | |
289 | log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno)); | |
290 | break; | |
291 | } | |
292 | } | |
293 | if(failure) { | |
294 | break; | |
295 | } | |
296 | } | |
297 | ||
298 | if( !failure ) { | |
299 | fio_file_set_size_known(f); | |
300 | } | |
1b10477b | 301 | |
a3f001f5 | 302 | return failure; |
1b10477b MM |
303 | } |
304 | ||
305 | static int fio_hdfsio_setup(struct thread_data *td) | |
306 | { | |
307 | struct hdfsio_data *hd; | |
308 | struct fio_file *f; | |
a3f001f5 FB |
309 | int i; |
310 | uint64_t file_size, total_file_size; | |
1b10477b MM |
311 | |
312 | if (!td->io_ops->data) { | |
a3f001f5 | 313 | hd = malloc(sizeof(*hd)); |
1b10477b | 314 | memset(hd, 0, sizeof(*hd)); |
a3f001f5 FB |
315 | |
316 | hd->curr_file_id = -1; | |
1b10477b | 317 | |
a3f001f5 FB |
318 | td->io_ops->data = hd; |
319 | } | |
320 | ||
321 | total_file_size = 0; | |
322 | file_size = 0; | |
323 | ||
324 | for_each_file(td, f, i) { | |
325 | if(!td->o.file_size_low) { | |
326 | file_size = floor(td->o.size / td->o.nr_files); | |
327 | total_file_size += file_size; | |
1b10477b | 328 | } |
a3f001f5 FB |
329 | else if (td->o.file_size_low == td->o.file_size_high) |
330 | file_size = td->o.file_size_low; | |
331 | else { | |
332 | file_size = get_rand_file_size(td); | |
333 | } | |
334 | f->real_file_size = file_size; | |
1b10477b | 335 | } |
a3f001f5 FB |
336 | /* If the size doesn't divide nicely with the chunck size, |
337 | * make the last files bigger. | |
338 | * Used only if filesize was not explicitely given | |
339 | */ | |
340 | if (!td->o.file_size_low && total_file_size < td->o.size) { | |
341 | f->real_file_size += (td->o.size - total_file_size); | |
342 | } | |
343 | ||
344 | return 0; | |
345 | } | |
1b10477b | 346 | |
a3f001f5 FB |
347 | static int fio_hdfsio_io_u_init(struct thread_data *td, struct io_u *io_u) |
348 | { | |
349 | struct hdfsio_data *hd = td->io_ops->data; | |
350 | struct hdfsio_options *options = td->eo; | |
351 | int failure; | |
352 | struct hdfsBuilder *bld; | |
353 | ||
354 | if (options->host == NULL || options->port == 0) { | |
355 | log_err("hdfs: server not defined\n"); | |
356 | return EINVAL; | |
357 | } | |
358 | ||
359 | bld = hdfsNewBuilder(); | |
360 | if (!bld) { | |
361 | failure = errno; | |
362 | log_err("hdfs: unable to allocate connect builder\n"); | |
363 | return failure; | |
364 | } | |
365 | hdfsBuilderSetNameNode(bld, options->host); | |
366 | hdfsBuilderSetNameNodePort(bld, options->port); | |
367 | if(! options->single_instance) { | |
368 | hdfsBuilderSetForceNewInstance(bld); | |
369 | } | |
370 | hd->fs = hdfsBuilderConnect(bld); | |
371 | ||
372 | /* hdfsSetWorkingDirectory succeed on non existend directory */ | |
373 | if (hdfsExists(hd->fs, options->directory) < 0 || hdfsSetWorkingDirectory(hd->fs, options->directory) < 0) { | |
374 | failure = errno; | |
375 | log_err("hdfs: invalid working directory %s: %s\n", options->directory, strerror(errno)); | |
376 | return failure; | |
377 | } | |
378 | ||
1b10477b MM |
379 | return 0; |
380 | } | |
381 | ||
a3f001f5 FB |
382 | static void fio_hdfsio_io_u_free(struct thread_data *td, struct io_u *io_u) |
383 | { | |
384 | struct hdfsio_data *hd = td->io_ops->data; | |
385 | ||
386 | if (hd->fs && hdfsDisconnect(hd->fs) < 0) { | |
387 | log_err("hdfs: disconnect failed: %d\n", errno); | |
388 | } | |
389 | } | |
390 | ||
1b10477b MM |
391 | static struct ioengine_ops ioengine_hdfs = { |
392 | .name = "libhdfs", | |
393 | .version = FIO_IOOPS_VERSION, | |
a3f001f5 | 394 | .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_NODISKUTIL, |
1b10477b | 395 | .setup = fio_hdfsio_setup, |
a3f001f5 | 396 | .init = fio_hdfsio_init, |
1b10477b MM |
397 | .prep = fio_hdfsio_prep, |
398 | .queue = fio_hdfsio_queue, | |
399 | .open_file = fio_hdfsio_open_file, | |
400 | .close_file = fio_hdfsio_close_file, | |
a3f001f5 FB |
401 | .io_u_init = fio_hdfsio_io_u_init, |
402 | .io_u_free = fio_hdfsio_io_u_free, | |
403 | .option_struct_size = sizeof(struct hdfsio_options), | |
404 | .options = options, | |
1b10477b MM |
405 | }; |
406 | ||
a3f001f5 | 407 | |
1b10477b MM |
408 | static void fio_init fio_hdfsio_register(void) |
409 | { | |
410 | register_ioengine(&ioengine_hdfs); | |
411 | } | |
412 | ||
413 | static void fio_exit fio_hdfsio_unregister(void) | |
414 | { | |
415 | unregister_ioengine(&ioengine_hdfs); | |
416 | } |