Commit | Line | Data |
---|---|---|
1b10477b MM |
1 | /* |
2 | * libhdfs engine | |
3 | * | |
4 | * this engine helps perform read/write operations on hdfs cluster using | |
6ef7e18d | 5 | * libhdfs. hdfs does not support modification of data once file is created. |
1b10477b MM |
6 | * |
7 | * so to mimic that create many files of small size (e.g 256k), and this | |
8 | * engine select a file based on the offset generated by fio. | |
9 | * | |
10 | * thus, random reads and writes can also be achieved with this logic. | |
11 | * | |
1b10477b MM |
12 | */ |
13 | ||
a3f001f5 FB |
14 | #include <math.h> |
15 | #include <hdfs.h> | |
1b10477b MM |
16 | |
17 | #include "../fio.h" | |
d220c761 | 18 | #include "../optgroup.h" |
a3f001f5 FB |
19 | |
20 | #define CHUNCK_NAME_LENGTH_MAX 80 | |
21 | #define CHUNCK_CREATION_BUFFER_SIZE 65536 | |
1b10477b MM |
22 | |
23 | struct hdfsio_data { | |
1b10477b MM |
24 | hdfsFS fs; |
25 | hdfsFile fp; | |
a3f001f5 | 26 | uint64_t curr_file_id; |
1b10477b MM |
27 | }; |
28 | ||
a3f001f5 | 29 | struct hdfsio_options { |
fc002f14 | 30 | void *pad; /* needed because offset can't be 0 for an option defined used offsetof */ |
a3f001f5 FB |
31 | char *host; |
32 | char *directory; | |
33 | unsigned int port; | |
34 | unsigned int chunck_size; | |
35 | unsigned int single_instance; | |
36 | unsigned int use_direct; | |
37 | }; | |
1b10477b | 38 | |
a3f001f5 FB |
39 | static struct fio_option options[] = { |
40 | { | |
41 | .name = "namenode", | |
42 | .lname = "hfds namenode", | |
43 | .type = FIO_OPT_STR_STORE, | |
44 | .off1 = offsetof(struct hdfsio_options, host), | |
45 | .def = "localhost", | |
46 | .help = "Namenode of the HDFS cluster", | |
47 | .category = FIO_OPT_C_ENGINE, | |
48 | .group = FIO_OPT_G_HDFS, | |
49 | }, | |
50 | { | |
51 | .name = "hostname", | |
52 | .lname = "hfds namenode", | |
53 | .type = FIO_OPT_STR_STORE, | |
54 | .off1 = offsetof(struct hdfsio_options, host), | |
55 | .def = "localhost", | |
56 | .help = "Namenode of the HDFS cluster", | |
57 | .category = FIO_OPT_C_ENGINE, | |
58 | .group = FIO_OPT_G_HDFS, | |
59 | }, | |
60 | { | |
61 | .name = "port", | |
62 | .lname = "hdfs namenode port", | |
63 | .type = FIO_OPT_INT, | |
64 | .off1 = offsetof(struct hdfsio_options, port), | |
65 | .def = "9000", | |
66 | .minval = 1, | |
67 | .maxval = 65535, | |
68 | .help = "Port used by the HDFS cluster namenode", | |
69 | .category = FIO_OPT_C_ENGINE, | |
70 | .group = FIO_OPT_G_HDFS, | |
71 | }, | |
72 | { | |
73 | .name = "hdfsdirectory", | |
74 | .lname = "hfds directory", | |
75 | .type = FIO_OPT_STR_STORE, | |
76 | .off1 = offsetof(struct hdfsio_options, directory), | |
77 | .def = "/", | |
6ef7e18d | 78 | .help = "The HDFS directory where fio will create chunks", |
a3f001f5 FB |
79 | .category = FIO_OPT_C_ENGINE, |
80 | .group = FIO_OPT_G_HDFS, | |
81 | }, | |
82 | { | |
dda13f44 JA |
83 | .name = "chunk_size", |
84 | .alias = "chunck_size", | |
85 | .lname = "Chunk size", | |
a3f001f5 FB |
86 | .type = FIO_OPT_INT, |
87 | .off1 = offsetof(struct hdfsio_options, chunck_size), | |
88 | .def = "1048576", | |
6ef7e18d | 89 | .help = "Size of individual chunk", |
a3f001f5 FB |
90 | .category = FIO_OPT_C_ENGINE, |
91 | .group = FIO_OPT_G_HDFS, | |
92 | }, | |
93 | { | |
94 | .name = "single_instance", | |
dda13f44 | 95 | .lname = "Single Instance", |
a3f001f5 FB |
96 | .type = FIO_OPT_BOOL, |
97 | .off1 = offsetof(struct hdfsio_options, single_instance), | |
98 | .def = "1", | |
99 | .help = "Use a single instance", | |
100 | .category = FIO_OPT_C_ENGINE, | |
101 | .group = FIO_OPT_G_HDFS, | |
102 | }, | |
103 | { | |
104 | .name = "hdfs_use_direct", | |
dda13f44 | 105 | .lname = "HDFS Use Direct", |
a3f001f5 FB |
106 | .type = FIO_OPT_BOOL, |
107 | .off1 = offsetof(struct hdfsio_options, use_direct), | |
108 | .def = "0", | |
109 | .help = "Use readDirect instead of hdfsRead", | |
110 | .category = FIO_OPT_C_ENGINE, | |
111 | .group = FIO_OPT_G_HDFS, | |
112 | }, | |
113 | { | |
114 | .name = NULL, | |
115 | }, | |
116 | }; | |
117 | ||
118 | ||
119 | static int get_chunck_name(char *dest, char *file_name, uint64_t chunk_id) { | |
120 | return snprintf(dest, CHUNCK_NAME_LENGTH_MAX, "%s_%lu", file_name, chunk_id); | |
1b10477b MM |
121 | } |
122 | ||
123 | static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u) | |
124 | { | |
a3f001f5 | 125 | struct hdfsio_options *options = td->eo; |
565e784d | 126 | struct hdfsio_data *hd = td->io_ops_data; |
1b10477b | 127 | unsigned long f_id; |
a3f001f5 FB |
128 | char fname[CHUNCK_NAME_LENGTH_MAX]; |
129 | int open_flags; | |
1b10477b MM |
130 | |
131 | /* find out file id based on the offset generated by fio */ | |
a3f001f5 | 132 | f_id = floor(io_u->offset / options-> chunck_size); |
1b10477b MM |
133 | |
134 | if (f_id == hd->curr_file_id) { | |
135 | /* file is already open */ | |
136 | return 0; | |
137 | } | |
138 | ||
139 | if (hd->curr_file_id != -1) { | |
a3f001f5 FB |
140 | if ( hdfsCloseFile(hd->fs, hd->fp) == -1) { |
141 | log_err("hdfs: unable to close file: %s\n", strerror(errno)); | |
142 | return errno; | |
143 | } | |
144 | hd->curr_file_id = -1; | |
1b10477b MM |
145 | } |
146 | ||
a3f001f5 | 147 | if (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_SYNC) { |
1b10477b MM |
148 | open_flags = O_RDONLY; |
149 | } else if (io_u->ddir == DDIR_WRITE) { | |
150 | open_flags = O_WRONLY; | |
151 | } else { | |
7d4a8e7e | 152 | log_err("hdfs: Invalid I/O Operation\n"); |
a3f001f5 FB |
153 | return 0; |
154 | } | |
155 | ||
156 | get_chunck_name(fname, io_u->file->file_name, f_id); | |
157 | hd->fp = hdfsOpenFile(hd->fs, fname, open_flags, 0, 0, | |
158 | options->chunck_size); | |
159 | if(hd->fp == NULL) { | |
160 | log_err("hdfs: unable to open file: %s: %d\n", fname, strerror(errno)); | |
161 | return errno; | |
1b10477b | 162 | } |
1b10477b | 163 | hd->curr_file_id = f_id; |
1b10477b MM |
164 | |
165 | return 0; | |
166 | } | |
167 | ||
2e4ef4fb JA |
168 | static enum fio_q_status fio_hdfsio_queue(struct thread_data *td, |
169 | struct io_u *io_u) | |
1b10477b | 170 | { |
565e784d | 171 | struct hdfsio_data *hd = td->io_ops_data; |
a3f001f5 FB |
172 | struct hdfsio_options *options = td->eo; |
173 | int ret; | |
174 | unsigned long offset; | |
175 | ||
176 | offset = io_u->offset % options->chunck_size; | |
177 | ||
178 | if( (io_u->ddir == DDIR_READ || io_u->ddir == DDIR_WRITE) && | |
179 | hdfsTell(hd->fs, hd->fp) != offset && hdfsSeek(hd->fs, hd->fp, offset) != 0 ) { | |
6ef7e18d | 180 | log_err("hdfs: seek failed: %s, are you doing random write smaller than chunk size ?\n", strerror(errno)); |
a3f001f5 FB |
181 | io_u->error = errno; |
182 | return FIO_Q_COMPLETED; | |
183 | }; | |
184 | ||
185 | // do the IO | |
186 | if (io_u->ddir == DDIR_READ) { | |
187 | if (options->use_direct) { | |
188 | ret = readDirect(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen); | |
189 | } else { | |
190 | ret = hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen); | |
191 | } | |
192 | } else if (io_u->ddir == DDIR_WRITE) { | |
193 | ret = hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf, | |
194 | io_u->xfer_buflen); | |
195 | } else if (io_u->ddir == DDIR_SYNC) { | |
196 | ret = hdfsFlush(hd->fs, hd->fp); | |
197 | } else { | |
198 | log_err("hdfs: Invalid I/O Operation: %d\n", io_u->ddir); | |
199 | ret = EINVAL; | |
200 | } | |
201 | ||
202 | // Check if the IO went fine, or is incomplete | |
1b10477b MM |
203 | if (ret != (int)io_u->xfer_buflen) { |
204 | if (ret >= 0) { | |
205 | io_u->resid = io_u->xfer_buflen - ret; | |
206 | io_u->error = 0; | |
207 | return FIO_Q_COMPLETED; | |
a3f001f5 | 208 | } else { |
1b10477b | 209 | io_u->error = errno; |
a3f001f5 | 210 | } |
1b10477b MM |
211 | } |
212 | ||
213 | if (io_u->error) | |
214 | td_verror(td, io_u->error, "xfer"); | |
215 | ||
216 | return FIO_Q_COMPLETED; | |
217 | } | |
218 | ||
a3f001f5 | 219 | int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f) |
1b10477b | 220 | { |
a3f001f5 FB |
221 | if (td->o.odirect) { |
222 | td->error = EINVAL; | |
223 | return 0; | |
1b10477b MM |
224 | } |
225 | ||
a3f001f5 | 226 | return 0; |
1b10477b MM |
227 | } |
228 | ||
a3f001f5 | 229 | int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f) |
1b10477b | 230 | { |
565e784d | 231 | struct hdfsio_data *hd = td->io_ops_data; |
1b10477b | 232 | |
a3f001f5 FB |
233 | if (hd->curr_file_id != -1) { |
234 | if ( hdfsCloseFile(hd->fs, hd->fp) == -1) { | |
235 | log_err("hdfs: unable to close file: %s\n", strerror(errno)); | |
236 | return errno; | |
237 | } | |
238 | hd->curr_file_id = -1; | |
239 | } | |
1b10477b MM |
240 | return 0; |
241 | } | |
242 | ||
e2fba3ab | 243 | static int fio_hdfsio_io_u_init(struct thread_data *td, struct io_u *io_u) |
1b10477b | 244 | { |
a3f001f5 | 245 | struct hdfsio_options *options = td->eo; |
565e784d | 246 | struct hdfsio_data *hd = td->io_ops_data; |
a3f001f5 FB |
247 | struct fio_file *f; |
248 | uint64_t j,k; | |
249 | int i, failure = 0; | |
250 | uint8_t buffer[CHUNCK_CREATION_BUFFER_SIZE]; | |
251 | uint64_t bytes_left; | |
252 | char fname[CHUNCK_NAME_LENGTH_MAX]; | |
253 | hdfsFile fp; | |
254 | hdfsFileInfo *fi; | |
255 | tOffset fi_size; | |
256 | ||
257 | for_each_file(td, f, i) { | |
258 | k = 0; | |
259 | for(j=0; j < f->real_file_size; j += options->chunck_size) { | |
260 | get_chunck_name(fname, f->file_name, k++); | |
261 | fi = hdfsGetPathInfo(hd->fs, fname); | |
262 | fi_size = fi ? fi->mSize : 0; | |
263 | // fill exist and is big enough, nothing to do | |
264 | if( fi && fi_size >= options->chunck_size) { | |
265 | continue; | |
266 | } | |
267 | fp = hdfsOpenFile(hd->fs, fname, O_WRONLY, 0, 0, | |
268 | options->chunck_size); | |
269 | if(fp == NULL) { | |
270 | failure = errno; | |
271 | log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno)); | |
272 | break; | |
273 | } | |
274 | bytes_left = options->chunck_size; | |
275 | memset(buffer, 0, CHUNCK_CREATION_BUFFER_SIZE); | |
276 | while( bytes_left > CHUNCK_CREATION_BUFFER_SIZE) { | |
277 | if( hdfsWrite(hd->fs, fp, buffer, CHUNCK_CREATION_BUFFER_SIZE) | |
278 | != CHUNCK_CREATION_BUFFER_SIZE) { | |
279 | failure = errno; | |
280 | log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno)); | |
281 | break; | |
282 | }; | |
283 | bytes_left -= CHUNCK_CREATION_BUFFER_SIZE; | |
284 | } | |
285 | if(bytes_left > 0) { | |
286 | if( hdfsWrite(hd->fs, fp, buffer, bytes_left) | |
287 | != bytes_left) { | |
288 | failure = errno; | |
289 | break; | |
290 | }; | |
291 | } | |
292 | if( hdfsCloseFile(hd->fs, fp) != 0) { | |
293 | failure = errno; | |
294 | log_err("hdfs: unable to prepare file chunk %s: %s\n", fname, strerror(errno)); | |
295 | break; | |
296 | } | |
297 | } | |
298 | if(failure) { | |
299 | break; | |
300 | } | |
301 | } | |
302 | ||
303 | if( !failure ) { | |
304 | fio_file_set_size_known(f); | |
305 | } | |
1b10477b | 306 | |
a3f001f5 | 307 | return failure; |
1b10477b MM |
308 | } |
309 | ||
310 | static int fio_hdfsio_setup(struct thread_data *td) | |
311 | { | |
312 | struct hdfsio_data *hd; | |
313 | struct fio_file *f; | |
a3f001f5 FB |
314 | int i; |
315 | uint64_t file_size, total_file_size; | |
1b10477b | 316 | |
565e784d | 317 | if (!td->io_ops_data) { |
a3f001f5 | 318 | hd = malloc(sizeof(*hd)); |
1b10477b | 319 | memset(hd, 0, sizeof(*hd)); |
a3f001f5 FB |
320 | |
321 | hd->curr_file_id = -1; | |
1b10477b | 322 | |
565e784d | 323 | td->io_ops_data = hd; |
a3f001f5 FB |
324 | } |
325 | ||
326 | total_file_size = 0; | |
327 | file_size = 0; | |
328 | ||
329 | for_each_file(td, f, i) { | |
330 | if(!td->o.file_size_low) { | |
331 | file_size = floor(td->o.size / td->o.nr_files); | |
332 | total_file_size += file_size; | |
1b10477b | 333 | } |
a3f001f5 FB |
334 | else if (td->o.file_size_low == td->o.file_size_high) |
335 | file_size = td->o.file_size_low; | |
336 | else { | |
337 | file_size = get_rand_file_size(td); | |
338 | } | |
339 | f->real_file_size = file_size; | |
1b10477b | 340 | } |
6ef7e18d | 341 | /* If the size doesn't divide nicely with the chunk size, |
a3f001f5 | 342 | * make the last files bigger. |
6ef7e18d | 343 | * Used only if filesize was not explicitly given |
a3f001f5 FB |
344 | */ |
345 | if (!td->o.file_size_low && total_file_size < td->o.size) { | |
346 | f->real_file_size += (td->o.size - total_file_size); | |
347 | } | |
348 | ||
349 | return 0; | |
350 | } | |
1b10477b | 351 | |
e2fba3ab | 352 | static int fio_hdfsio_init(struct thread_data *td) |
a3f001f5 | 353 | { |
565e784d | 354 | struct hdfsio_data *hd = td->io_ops_data; |
a3f001f5 FB |
355 | struct hdfsio_options *options = td->eo; |
356 | int failure; | |
357 | struct hdfsBuilder *bld; | |
358 | ||
359 | if (options->host == NULL || options->port == 0) { | |
360 | log_err("hdfs: server not defined\n"); | |
361 | return EINVAL; | |
362 | } | |
363 | ||
364 | bld = hdfsNewBuilder(); | |
365 | if (!bld) { | |
366 | failure = errno; | |
367 | log_err("hdfs: unable to allocate connect builder\n"); | |
368 | return failure; | |
369 | } | |
370 | hdfsBuilderSetNameNode(bld, options->host); | |
371 | hdfsBuilderSetNameNodePort(bld, options->port); | |
372 | if(! options->single_instance) { | |
373 | hdfsBuilderSetForceNewInstance(bld); | |
374 | } | |
375 | hd->fs = hdfsBuilderConnect(bld); | |
376 | ||
6ef7e18d | 377 | /* hdfsSetWorkingDirectory succeed on non-existent directory */ |
a3f001f5 FB |
378 | if (hdfsExists(hd->fs, options->directory) < 0 || hdfsSetWorkingDirectory(hd->fs, options->directory) < 0) { |
379 | failure = errno; | |
380 | log_err("hdfs: invalid working directory %s: %s\n", options->directory, strerror(errno)); | |
381 | return failure; | |
382 | } | |
383 | ||
1b10477b MM |
384 | return 0; |
385 | } | |
386 | ||
a3f001f5 FB |
387 | static void fio_hdfsio_io_u_free(struct thread_data *td, struct io_u *io_u) |
388 | { | |
565e784d | 389 | struct hdfsio_data *hd = td->io_ops_data; |
a3f001f5 FB |
390 | |
391 | if (hd->fs && hdfsDisconnect(hd->fs) < 0) { | |
392 | log_err("hdfs: disconnect failed: %d\n", errno); | |
393 | } | |
394 | } | |
395 | ||
5a8a6a03 | 396 | FIO_STATIC struct ioengine_ops ioengine = { |
1b10477b MM |
397 | .name = "libhdfs", |
398 | .version = FIO_IOOPS_VERSION, | |
a3f001f5 | 399 | .flags = FIO_SYNCIO | FIO_DISKLESSIO | FIO_NODISKUTIL, |
1b10477b | 400 | .setup = fio_hdfsio_setup, |
a3f001f5 | 401 | .init = fio_hdfsio_init, |
1b10477b MM |
402 | .prep = fio_hdfsio_prep, |
403 | .queue = fio_hdfsio_queue, | |
404 | .open_file = fio_hdfsio_open_file, | |
405 | .close_file = fio_hdfsio_close_file, | |
a3f001f5 FB |
406 | .io_u_init = fio_hdfsio_io_u_init, |
407 | .io_u_free = fio_hdfsio_io_u_free, | |
408 | .option_struct_size = sizeof(struct hdfsio_options), | |
409 | .options = options, | |
1b10477b MM |
410 | }; |
411 | ||
a3f001f5 | 412 | |
1b10477b MM |
413 | static void fio_init fio_hdfsio_register(void) |
414 | { | |
5a8a6a03 | 415 | register_ioengine(&ioengine); |
1b10477b MM |
416 | } |
417 | ||
418 | static void fio_exit fio_hdfsio_unregister(void) | |
419 | { | |
5a8a6a03 | 420 | unregister_ioengine(&ioengine); |
1b10477b | 421 | } |