Commit | Line | Data |
---|---|---|
1b10477b MM |
1 | /* |
2 | * libhdfs engine | |
3 | * | |
4 | * this engine helps perform read/write operations on hdfs cluster using | |
5 | * libhdfs. hdfs doesnot support modification of data once file is created. | |
6 | * | |
7 | * so to mimic that create many files of small size (e.g 256k), and this | |
8 | * engine select a file based on the offset generated by fio. | |
9 | * | |
10 | * thus, random reads and writes can also be achieved with this logic. | |
11 | * | |
12 | * NOTE: please set environment variables FIO_HDFS_BS and FIO_HDFS_FCOUNT | |
13 | * to appropriate value to work this engine properly | |
14 | * | |
15 | */ | |
16 | ||
17 | #include <stdio.h> | |
18 | #include <stdlib.h> | |
19 | #include <unistd.h> | |
20 | #include <sys/uio.h> | |
21 | #include <errno.h> | |
22 | #include <assert.h> | |
23 | ||
24 | #include "../fio.h" | |
25 | ||
26 | #include "hdfs.h" | |
27 | ||
28 | struct hdfsio_data { | |
29 | char host[256]; | |
30 | int port; | |
31 | hdfsFS fs; | |
32 | hdfsFile fp; | |
33 | unsigned long fsbs; | |
34 | unsigned long fscount; | |
35 | unsigned long curr_file_id; | |
36 | unsigned int numjobs; | |
37 | unsigned int fid_correction; | |
38 | }; | |
39 | ||
40 | static int fio_hdfsio_setup_fs_params(struct hdfsio_data *hd) | |
41 | { | |
42 | /* make sure that hdfsConnect is invoked before executing this function */ | |
43 | hdfsSetWorkingDirectory(hd->fs, "/.perftest"); | |
44 | hd->fp = hdfsOpenFile(hd->fs, ".fcount", O_RDONLY, 0, 0, 0); | |
45 | if (hd->fp) { | |
46 | hdfsRead(hd->fs, hd->fp, &(hd->fscount), sizeof(hd->fscount)); | |
47 | hdfsCloseFile(hd->fs, hd->fp); | |
48 | } | |
49 | hd->fp = hdfsOpenFile(hd->fs, ".fbs", O_RDONLY, 0, 0, 0); | |
50 | if (hd->fp) { | |
51 | hdfsRead(hd->fs, hd->fp, &(hd->fsbs), sizeof(hd->fsbs)); | |
52 | hdfsCloseFile(hd->fs, hd->fp); | |
53 | } | |
54 | ||
55 | return 0; | |
56 | } | |
57 | ||
58 | static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u) | |
59 | { | |
60 | struct hdfsio_data *hd; | |
61 | hdfsFileInfo *fi; | |
62 | unsigned long f_id; | |
63 | char fname[80]; | |
64 | int open_flags = 0; | |
65 | ||
66 | hd = td->io_ops->data; | |
67 | ||
68 | if (hd->curr_file_id == -1) { | |
69 | /* see comment in fio_hdfsio_setup() function */ | |
70 | fio_hdfsio_setup_fs_params(hd); | |
71 | } | |
72 | ||
73 | /* find out file id based on the offset generated by fio */ | |
74 | f_id = (io_u->offset / hd->fsbs) + hd->fid_correction; | |
75 | ||
76 | if (f_id == hd->curr_file_id) { | |
77 | /* file is already open */ | |
78 | return 0; | |
79 | } | |
80 | ||
81 | if (hd->curr_file_id != -1) { | |
82 | hdfsCloseFile(hd->fs, hd->fp); | |
83 | } | |
84 | ||
85 | if (io_u->ddir == DDIR_READ) { | |
86 | open_flags = O_RDONLY; | |
87 | } else if (io_u->ddir == DDIR_WRITE) { | |
88 | open_flags = O_WRONLY; | |
89 | } else { | |
7d4a8e7e | 90 | log_err("hdfs: Invalid I/O Operation\n"); |
1b10477b MM |
91 | } |
92 | ||
93 | hd->curr_file_id = f_id; | |
94 | do { | |
95 | sprintf(fname, ".f%lu", f_id); | |
96 | fi = hdfsGetPathInfo(hd->fs, fname); | |
97 | if (fi->mSize >= hd->fsbs || io_u->ddir == DDIR_WRITE) { | |
98 | /* file has enough data to read OR file is opened in write mode */ | |
99 | hd->fp = | |
100 | hdfsOpenFile(hd->fs, fname, open_flags, 0, 0, | |
101 | hd->fsbs); | |
102 | if (hd->fp) { | |
103 | break; | |
104 | } | |
105 | } | |
106 | /* file is empty, so try next file for reading */ | |
107 | f_id = (f_id + 1) % hd->fscount; | |
108 | } while (1); | |
109 | ||
110 | return 0; | |
111 | } | |
112 | ||
113 | static int fio_io_end(struct thread_data *td, struct io_u *io_u, int ret) | |
114 | { | |
115 | if (ret != (int)io_u->xfer_buflen) { | |
116 | if (ret >= 0) { | |
117 | io_u->resid = io_u->xfer_buflen - ret; | |
118 | io_u->error = 0; | |
119 | return FIO_Q_COMPLETED; | |
120 | } else | |
121 | io_u->error = errno; | |
122 | } | |
123 | ||
124 | if (io_u->error) | |
125 | td_verror(td, io_u->error, "xfer"); | |
126 | ||
127 | return FIO_Q_COMPLETED; | |
128 | } | |
129 | ||
130 | static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u) | |
131 | { | |
132 | struct hdfsio_data *hd; | |
133 | int ret = 0; | |
134 | ||
135 | hd = td->io_ops->data; | |
136 | ||
137 | if (io_u->ddir == DDIR_READ) { | |
138 | ret = | |
139 | hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen); | |
140 | } else if (io_u->ddir == DDIR_WRITE) { | |
141 | ret = | |
142 | hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf, | |
143 | io_u->xfer_buflen); | |
144 | } else { | |
7d4a8e7e | 145 | log_err("hdfs: Invalid I/O Operation\n"); |
1b10477b MM |
146 | } |
147 | ||
148 | return fio_io_end(td, io_u, ret); | |
149 | } | |
150 | ||
151 | int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f) | |
152 | { | |
153 | struct hdfsio_data *hd; | |
154 | ||
155 | hd = td->io_ops->data; | |
156 | hd->fs = hdfsConnect(hd->host, hd->port); | |
157 | hdfsSetWorkingDirectory(hd->fs, "/.perftest"); | |
158 | hd->fid_correction = (getpid() % hd->numjobs); | |
159 | ||
160 | return 0; | |
161 | } | |
162 | ||
163 | int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f) | |
164 | { | |
165 | struct hdfsio_data *hd; | |
166 | ||
167 | hd = td->io_ops->data; | |
168 | hdfsDisconnect(hd->fs); | |
169 | ||
170 | return 0; | |
171 | } | |
172 | ||
173 | static int fio_hdfsio_setup(struct thread_data *td) | |
174 | { | |
175 | struct hdfsio_data *hd; | |
176 | struct fio_file *f; | |
177 | static unsigned int numjobs = 1; /* atleast one job has to be there! */ | |
178 | numjobs = (td->o.numjobs > numjobs) ? td->o.numjobs : numjobs; | |
179 | ||
180 | if (!td->io_ops->data) { | |
181 | hd = malloc(sizeof(*hd));; | |
182 | ||
183 | memset(hd, 0, sizeof(*hd)); | |
184 | td->io_ops->data = hd; | |
185 | ||
186 | /* separate host and port from filename */ | |
187 | *(strchr(td->o.filename, ',')) = ' '; | |
188 | sscanf(td->o.filename, "%s%d", hd->host, &(hd->port)); | |
189 | ||
190 | /* read fbs and fcount and based on that set f->real_file_size */ | |
191 | f = td->files[0]; | |
192 | #if 0 | |
193 | /* IMHO, this should be done here instead of fio_hdfsio_prep() | |
194 | * but somehow calling it here doesn't seem to work, | |
195 | * some problem with libhdfs that needs to be debugged */ | |
196 | hd->fs = hdfsConnect(hd->host, hd->port); | |
197 | fio_hdfsio_setup_fs_params(hd); | |
198 | hdfsDisconnect(hd->fs); | |
199 | #else | |
200 | /* so, as an alternate, using environment variables */ | |
201 | if (getenv("FIO_HDFS_FCOUNT") && getenv("FIO_HDFS_BS")) { | |
202 | hd->fscount = atol(getenv("FIO_HDFS_FCOUNT")); | |
203 | hd->fsbs = atol(getenv("FIO_HDFS_BS")); | |
204 | } else { | |
7d4a8e7e | 205 | log_err("FIO_HDFS_FCOUNT and/or FIO_HDFS_BS not set.\n"); |
1b10477b MM |
206 | return 1; |
207 | } | |
208 | #endif | |
209 | f->real_file_size = hd->fscount * hd->fsbs; | |
210 | ||
211 | td->o.nr_files = 1; | |
212 | hd->curr_file_id = -1; | |
213 | hd->numjobs = numjobs; | |
214 | fio_file_set_size_known(f); | |
215 | } | |
216 | ||
217 | return 0; | |
218 | } | |
219 | ||
220 | static struct ioengine_ops ioengine_hdfs = { | |
221 | .name = "libhdfs", | |
222 | .version = FIO_IOOPS_VERSION, | |
223 | .setup = fio_hdfsio_setup, | |
224 | .prep = fio_hdfsio_prep, | |
225 | .queue = fio_hdfsio_queue, | |
226 | .open_file = fio_hdfsio_open_file, | |
227 | .close_file = fio_hdfsio_close_file, | |
228 | .flags = FIO_SYNCIO, | |
229 | }; | |
230 | ||
231 | static void fio_init fio_hdfsio_register(void) | |
232 | { | |
233 | register_ioengine(&ioengine_hdfs); | |
234 | } | |
235 | ||
236 | static void fio_exit fio_hdfsio_unregister(void) | |
237 | { | |
238 | unregister_ioengine(&ioengine_hdfs); | |
239 | } |