Commit | Line | Data |
---|---|---|
1b10477b MM |
1 | /* |
2 | * libhdfs engine | |
3 | * | |
4 | * this engine helps perform read/write operations on hdfs cluster using | |
5 | * libhdfs. hdfs doesnot support modification of data once file is created. | |
6 | * | |
7 | * so to mimic that create many files of small size (e.g 256k), and this | |
8 | * engine select a file based on the offset generated by fio. | |
9 | * | |
10 | * thus, random reads and writes can also be achieved with this logic. | |
11 | * | |
1b10477b MM |
12 | */ |
13 | ||
a3f001f5 FB |
14 | #include <math.h> |
15 | #include <hdfs.h> | |
1b10477b MM |
16 | |
17 | #include "../fio.h" | |
d220c761 | 18 | #include "../optgroup.h" |
a3f001f5 FB |
19 | |
20 | #define CHUNCK_NAME_LENGTH_MAX 80 | |
21 | #define CHUNCK_CREATION_BUFFER_SIZE 65536 | |
1b10477b MM |
22 | |
23 | struct hdfsio_data { | |
1b10477b MM |
24 | hdfsFS fs; |
25 | hdfsFile fp; | |
a3f001f5 | 26 | uint64_t curr_file_id; |
1b10477b MM |
27 | }; |
28 | ||
a3f001f5 FB |
29 | struct hdfsio_options { |
30 | void *pad; /* needed because offset can't be 0 for a option defined used offsetof */ | |
31 | char *host; | |
32 | char *directory; | |
33 | unsigned int port; | |
34 | unsigned int chunck_size; | |
35 | unsigned int single_instance; | |
36 | unsigned int use_direct; | |
37 | }; | |
1b10477b | 38 | |
a3f001f5 FB |
39 | static struct fio_option options[] = { |
40 | { | |
41 | .name = "namenode", | |
42 | .lname = "hfds namenode", | |
43 | .type = FIO_OPT_STR_STORE, | |
44 | .off1 = offsetof(struct hdfsio_options, host), | |
45 | .def = "localhost", | |
46 | .help = "Namenode of the HDFS cluster", | |
47 | .category = FIO_OPT_C_ENGINE, | |
48 | .group = FIO_OPT_G_HDFS, | |
49 | }, | |
50 | { | |
51 | .name = "hostname", | |
52 | .lname = "hfds namenode", | |
53 | .type = FIO_OPT_STR_STORE, | |
54 | .off1 = offsetof(struct hdfsio_options, host), | |
55 | .def = "localhost", | |
56 | .help = "Namenode of the HDFS cluster", | |
57 | .category = FIO_OPT_C_ENGINE, | |
58 | .group = FIO_OPT_G_HDFS, | |
59 | }, | |
60 | { | |
61 | .name = "port", | |
62 | .lname = "hdfs namenode port", | |
63 | .type = FIO_OPT_INT, | |
64 | .off1 = offsetof(struct hdfsio_options, port), | |
65 | .def = "9000", | |
66 | .minval = 1, | |
67 | .maxval = 65535, | |
68 | .help = "Port used by the HDFS cluster namenode", | |
69 | .category = FIO_OPT_C_ENGINE, | |
70 | .group = FIO_OPT_G_HDFS, | |
71 | }, | |
72 | { | |
73 | .name = "hdfsdirectory", | |
74 | .lname = "hfds directory", | |
75 | .type = FIO_OPT_STR_STORE, | |
76 | .off1 = offsetof(struct hdfsio_options, directory), | |
77 | .def = "/", | |
78 | .help = "The HDFS directory where fio will create chuncks", | |
79 | .category = FIO_OPT_C_ENGINE, | |
80 | .group = FIO_OPT_G_HDFS, | |
81 | }, | |
82 | { | |
dda13f44 JA |
83 | .name = "chunk_size", |
84 | .alias = "chunck_size", | |
85 | .lname = "Chunk size", | |
a3f001f5 FB |
86 | .type = FIO_OPT_INT, |
87 | .off1 = offsetof(struct hdfsio_options, chunck_size), | |
88 | .def = "1048576", | |
89 | .help = "Size of individual chunck", | |
90 | .category = FIO_OPT_C_ENGINE, | |
91 | .group = FIO_OPT_G_HDFS, | |
92 | }, | |
93 | { | |
94 | .name = "single_instance", | |
dda13f44 | 95 | .lname = "Single Instance", |
a3f001f5 FB |
96 | .type = FIO_OPT_BOOL, |
97 | .off1 = offsetof(struct hdfsio_options, single_instance), | |
98 | .def = "1", | |
99 | .help = "Use a single instance", | |
100 | .category = FIO_OPT_C_ENGINE, | |
101 | .group = FIO_OPT_G_HDFS, | |
102 | }, | |
103 | { | |
104 | .name = "hdfs_use_direct", | |
dda13f44 | 105 | .lname = "HDFS Use Direct", |
a3f001f5 FB |
106 | .type = FIO_OPT_BOOL, |
107 | .off1 = offsetof(struct hdfsio_options, use_direct), | |
108 | .def = "0", | |
109 | .help = "Use readDirect instead of hdfsRead", | |
110 | .category = FIO_OPT_C_ENGINE, | |
111 | .group = FIO_OPT_G_HDFS, | |
112 | }, | |
113 | { | |
114 | .name = NULL, | |
115 | }, | |
116 | }; | |
117 | ||
118 | ||
119 | static int get_chunck_name(char *dest, char *file_name, uint64_t chunk_id) { | |
120 | return snprintf(dest, CHUNCK_NAME_LENGTH_MAX, "%s_%lu", file_name, chunk_id); | |
1b10477b MM |
121 | } |
122 | ||
123 | static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u) | |
124 | { | |
a3f001f5 | 125 | struct hdfsio_options *options = td->eo; |
565e784d | 126 | struct hdfsio_data *hd = td->io_ops_data; |
1b10477b | 127 | unsigned long f_id; |
a3f001f5 FB |
128 | char fname[CHUNCK_NAME_LENGTH_MAX]; |
129 | int open_flags; | |
1b10477b MM |
130 | |
131 | /* find out file id based on the offset generated by fio */ | |
a3f001f5 | 132 | f_id = floor(io_u->offset / options-> chunck_size); |
1b10477b MM |
133 | |
134 | if (f_id == hd->curr_file_id) { | |
135 | /* file is already open */ | |
136 | return 0; | |
137 | } | |
138 | ||
139 | if (hd->curr_file_id != -1) { | |
a3f001f5 FB |
140 | if ( hdfsCloseFile(hd->fs, hd->fp) == -1) { |
141 | log_err("hdfs: unable to close file: %s\n", strerror(errno)); | |
142 | return errno; | |
143 | } | |
144 | hd->curr_file_id = -1; | |
1b10477b MM |
145 | } |
146 | ||
a3f001f5 | 147 | if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_SYNC) { |
1b10477b MM |
148 | open_flags = O_RDONLY; |
149 | } else if (io_u->ddir == DDIR_WRITE) { | |
150 | open_flags = O_WRONLY; | |
151 | } else { | |
7d4a8e7e | 152 | log_err("hdfs: Invalid I/O Operation\n"); |
a3f001f5 FB |
153 | return 0; |
154 | } | |
155 | ||
156 | get_chunck_name(fname, io_u->file->file_name, f_id); | |
157 | hd->fp = hdfsOpenFile(hd->fs, fname, open_flags, 0, 0, | |
158 | options->chunck_size); | |
159 | if(hd->fp == NULL) { | |
160 | log_err("hdfs: unable to open file: %s: %d\n", fname, strerror(errno)); | |
161 | return errno; | |
1b10477b | 162 | } |
1b10477b | 163 | hd->curr_file_id = f_id; |
1b10477b MM |
164 | |
165 | return 0; | |
166 | } | |
167 | ||
a3f001f5 | 168 | static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u) |
1b10477b | 169 | { |
565e784d | 170 | struct hdfsio_data *hd = td->io_ops_data; |
a3f001f5 FB |
171 | struct hdfsio_options *options = td->eo; |
172 | int ret; | |
173 | unsigned long offset; | |
174 | ||
175 | offset = io_u->offset % options->chunck_size; | |
176 | ||
177 | if( (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) && | |
178 | hdfsTell(hd->fs, hd->fp) != offset && hdfsSeek(hd->fs, hd->fp, offset) != 0 ) { | |
179 | log_err("hdfs: seek failed: %s, are you doing random write smaller than chunck size ?\n", strerror(errno)); | |
180 | io_u->error = errno; | |
181 | return FIO_Q_COMPLETED; | |
182 | }; | |
183 | ||
184 | // do the IO | |
185 | if (io_u->ddir == DDIR_READ) { | |
186 | if (options->use_direct) { | |
187 | ret = readDirect(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen); | |
188 | } else { | |
189 | ret = hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen); | |
190 | } | |
191 | } else if (io_u->ddir == DDIR_WRITE) { | |
192 | ret = hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf, | |
193 | io_u->xfer_buflen); | |
194 | } else if (io_u->ddir == DDIR_SYNC) { | |
195 | ret = hdfsFlush(hd->fs, hd->fp); | |
196 | } else { | |
197 | log_err("hdfs: Invalid I/O Operation: %d\n", io_u->ddir); | |
198 | ret = EINVAL; | |
199 | } | |
200 | ||
201 | // Check if the IO went fine, or is incomplete | |
1b10477b MM |
202 | if (ret != (int)io_u->xfer_buflen) { |
203 | if (ret >= 0) { | |
204 | io_u->resid = io_u->xfer_buflen - ret; | |
205 | io_u->error = 0; | |
206 | return FIO_Q_COMPLETED; | |
a3f001f5 | 207 | } else { |
1b10477b | 208 | io_u->error = errno; |
a3f001f5 | 209 | } |
1b10477b MM |
210 | } |
211 | ||
212 | if (io_u->error) | |
213 | td_verror(td, io_u->error, "xfer"); | |
214 | ||
215 | return FIO_Q_COMPLETED; | |
216 | } | |
217 | ||
a3f001f5 | 218 | int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f) |
1b10477b | 219 | { |
a3f001f5 FB |
220 | if (td->o.odirect) { |
221 | td->error = EINVAL; | |
222 | return 0; | |
1b10477b MM |
223 | } |
224 | ||
a3f001f5 | 225 | return 0; |
1b10477b MM |
226 | } |
227 | ||
a3f001f5 | 228 | int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f) |
1b10477b | 229 | { |
565e784d | 230 | struct hdfsio_data *hd = td->io_ops_data; |
1b10477b | 231 | |
a3f001f5 FB |
232 | if (hd->curr_file_id != -1) { |
233 | if ( hdfsCloseFile(hd->fs, hd->fp) == -1) { | |
234 | log_err("hdfs: unable to close file: %s\n", strerror(errno)); | |
235 | return errno; | |
236 | } | |
237 | hd->curr_file_id = -1; | |
238 | } | |
1b10477b MM |
239 | return 0; |
240 | } | |
241 | ||
a3f001f5 | 242 | static int fio_hdfsio_init(struct thread_data *td) |
1b10477b | 243 | { |
a3f001f5 | 244 | struct hdfsio_options *options = td->eo; |
565e784d | 245 | struct hdfsio_data *hd = td->io_ops_data; |
a3f001f5 FB |
246 | struct fio_file *f; |
247 | uint64_t j,k; | |
248 | int i, failure = 0; | |
249 | uint8_t buffer[CHUNCK_CREATION_BUFFER_SIZE]; | |
250 | uint64_t bytes_left; | |
251 | char fname[CHUNCK_NAME_LENGTH_MAX]; | |
252 | hdfsFile fp; | |
253 | hdfsFileInfo *fi; | |
254 | tOffset fi_size; | |
255 | ||
256 | for_each_file(td, f, i) { | |
257 | k = 0; | |
258 | for(j=0; j < f->real_file_size; j += options->chunck_size) { | |
259 | get_chunck_name(fname, f->file_name, k++); | |
260 | fi = hdfsGetPathInfo(hd->fs, fname); | |
261 | fi_size = fi ? fi->mSize : 0; | |
262 | // fill exist and is big enough, nothing to do | |
263 | if( fi && fi_size >= options->chunck_size) { | |
264 | continue; | |
265 | } | |
266 | fp = hdfsOpenFile(hd->fs, fname, O_WRONLY, 0, 0, | |
267 | options->chunck_size); | |
268 | if(fp == NULL) { | |
269 | failure = errno; | |
270 | log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno)); | |
271 | break; | |
272 | } | |
273 | bytes_left = options->chunck_size; | |
274 | memset(buffer, 0, CHUNCK_CREATION_BUFFER_SIZE); | |
275 | while( bytes_left > CHUNCK_CREATION_BUFFER_SIZE) { | |
276 | if( hdfsWrite(hd->fs, fp, buffer, CHUNCK_CREATION_BUFFER_SIZE) | |
277 | != CHUNCK_CREATION_BUFFER_SIZE) { | |
278 | failure = errno; | |
279 | log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno)); | |
280 | break; | |
281 | }; | |
282 | bytes_left -= CHUNCK_CREATION_BUFFER_SIZE; | |
283 | } | |
284 | if(bytes_left > 0) { | |
285 | if( hdfsWrite(hd->fs, fp, buffer, bytes_left) | |
286 | != bytes_left) { | |
287 | failure = errno; | |
288 | break; | |
289 | }; | |
290 | } | |
291 | if( hdfsCloseFile(hd->fs, fp) != 0) { | |
292 | failure = errno; | |
293 | log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno)); | |
294 | break; | |
295 | } | |
296 | } | |
297 | if(failure) { | |
298 | break; | |
299 | } | |
300 | } | |
301 | ||
302 | if( !failure ) { | |
303 | fio_file_set_size_known(f); | |
304 | } | |
1b10477b | 305 | |
a3f001f5 | 306 | return failure; |
1b10477b MM |
307 | } |
308 | ||
309 | static int fio_hdfsio_setup(struct thread_data *td) | |
310 | { | |
311 | struct hdfsio_data *hd; | |
312 | struct fio_file *f; | |
a3f001f5 FB |
313 | int i; |
314 | uint64_t file_size, total_file_size; | |
1b10477b | 315 | |
565e784d | 316 | if (!td->io_ops_data) { |
a3f001f5 | 317 | hd = malloc(sizeof(*hd)); |
1b10477b | 318 | memset(hd, 0, sizeof(*hd)); |
a3f001f5 FB |
319 | |
320 | hd->curr_file_id = -1; | |
1b10477b | 321 | |
565e784d | 322 | td->io_ops_data = hd; |
a3f001f5 FB |
323 | } |
324 | ||
325 | total_file_size = 0; | |
326 | file_size = 0; | |
327 | ||
328 | for_each_file(td, f, i) { | |
329 | if(!td->o.file_size_low) { | |
330 | file_size = floor(td->o.size / td->o.nr_files); | |
331 | total_file_size += file_size; | |
1b10477b | 332 | } |
a3f001f5 FB |
333 | else if (td->o.file_size_low == td->o.file_size_high) |
334 | file_size = td->o.file_size_low; | |
335 | else { | |
336 | file_size = get_rand_file_size(td); | |
337 | } | |
338 | f->real_file_size = file_size; | |
1b10477b | 339 | } |
a3f001f5 FB |
340 | /* If the size doesn't divide nicely with the chunck size, |
341 | * make the last files bigger. | |
342 | * Used only if filesize was not explicitely given | |
343 | */ | |
344 | if (!td->o.file_size_low && total_file_size < td->o.size) { | |
345 | f->real_file_size += (td->o.size - total_file_size); | |
346 | } | |
347 | ||
348 | return 0; | |
349 | } | |
1b10477b | 350 | |
a3f001f5 FB |
351 | static int fio_hdfsio_io_u_init(struct thread_data *td, struct io_u *io_u) |
352 | { | |
565e784d | 353 | struct hdfsio_data *hd = td->io_ops_data; |
a3f001f5 FB |
354 | struct hdfsio_options *options = td->eo; |
355 | int failure; | |
356 | struct hdfsBuilder *bld; | |
357 | ||
358 | if (options->host == NULL || options->port == 0) { | |
359 | log_err("hdfs: server not defined\n"); | |
360 | return EINVAL; | |
361 | } | |
362 | ||
363 | bld = hdfsNewBuilder(); | |
364 | if (!bld) { | |
365 | failure = errno; | |
366 | log_err("hdfs: unable to allocate connect builder\n"); | |
367 | return failure; | |
368 | } | |
369 | hdfsBuilderSetNameNode(bld, options->host); | |
370 | hdfsBuilderSetNameNodePort(bld, options->port); | |
371 | if(! options->single_instance) { | |
372 | hdfsBuilderSetForceNewInstance(bld); | |
373 | } | |
374 | hd->fs = hdfsBuilderConnect(bld); | |
375 | ||
376 | /* hdfsSetWorkingDirectory succeed on non existend directory */ | |
377 | if (hdfsExists(hd->fs, options->directory) < 0 || hdfsSetWorkingDirectory(hd->fs, options->directory) < 0) { | |
378 | failure = errno; | |
379 | log_err("hdfs: invalid working directory %s: %s\n", options->directory, strerror(errno)); | |
380 | return failure; | |
381 | } | |
382 | ||
1b10477b MM |
383 | return 0; |
384 | } | |
385 | ||
a3f001f5 FB |
386 | static void fio_hdfsio_io_u_free(struct thread_data *td, struct io_u *io_u) |
387 | { | |
565e784d | 388 | struct hdfsio_data *hd = td->io_ops_data; |
a3f001f5 FB |
389 | |
390 | if (hd->fs && hdfsDisconnect(hd->fs) < 0) { | |
391 | log_err("hdfs: disconnect failed: %d\n", errno); | |
392 | } | |
393 | } | |
394 | ||
1b10477b MM |
395 | static struct ioengine_ops ioengine_hdfs = { |
396 | .name = "libhdfs", | |
397 | .version = FIO_IOOPS_VERSION, | |
a3f001f5 | 398 | .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_NODISKUTIL, |
1b10477b | 399 | .setup = fio_hdfsio_setup, |
a3f001f5 | 400 | .init = fio_hdfsio_init, |
1b10477b MM |
401 | .prep = fio_hdfsio_prep, |
402 | .queue = fio_hdfsio_queue, | |
403 | .open_file = fio_hdfsio_open_file, | |
404 | .close_file = fio_hdfsio_close_file, | |
a3f001f5 FB |
405 | .io_u_init = fio_hdfsio_io_u_init, |
406 | .io_u_free = fio_hdfsio_io_u_free, | |
407 | .option_struct_size = sizeof(struct hdfsio_options), | |
408 | .options = options, | |
1b10477b MM |
409 | }; |
410 | ||
a3f001f5 | 411 | |
1b10477b MM |
412 | static void fio_init fio_hdfsio_register(void) |
413 | { | |
414 | register_ioengine(&ioengine_hdfs); | |
415 | } | |
416 | ||
417 | static void fio_exit fio_hdfsio_unregister(void) | |
418 | { | |
419 | unregister_ioengine(&ioengine_hdfs); | |
420 | } |