4 * this engine helps perform read/write operations on hdfs cluster using
5 * libhdfs. hdfs doesnot support modification of data once file is created.
7 * so to mimic that create many files of small size (e.g 256k), and this
8 * engine select a file based on the offset generated by fio.
10 * thus, random reads and writes can also be achieved with this logic.
12 * NOTE: please set environment variables FIO_HDFS_BS and FIO_HDFS_FCOUNT
13 * to appropriate value to work this engine properly
34 unsigned long fscount;
35 unsigned long curr_file_id;
37 unsigned int fid_correction;
40 static int fio_hdfsio_setup_fs_params(struct hdfsio_data *hd)
42 /* make sure that hdfsConnect is invoked before executing this function */
43 hdfsSetWorkingDirectory(hd->fs, "/.perftest");
44 hd->fp = hdfsOpenFile(hd->fs, ".fcount", O_RDONLY, 0, 0, 0);
46 hdfsRead(hd->fs, hd->fp, &(hd->fscount), sizeof(hd->fscount));
47 hdfsCloseFile(hd->fs, hd->fp);
49 hd->fp = hdfsOpenFile(hd->fs, ".fbs", O_RDONLY, 0, 0, 0);
51 hdfsRead(hd->fs, hd->fp, &(hd->fsbs), sizeof(hd->fsbs));
52 hdfsCloseFile(hd->fs, hd->fp);
58 static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u)
60 struct hdfsio_data *hd;
66 hd = td->io_ops->data;
68 if (hd->curr_file_id == -1) {
69 /* see comment in fio_hdfsio_setup() function */
70 fio_hdfsio_setup_fs_params(hd);
73 /* find out file id based on the offset generated by fio */
74 f_id = (io_u->offset / hd->fsbs) + hd->fid_correction;
76 if (f_id == hd->curr_file_id) {
77 /* file is already open */
81 if (hd->curr_file_id != -1) {
82 hdfsCloseFile(hd->fs, hd->fp);
85 if (io_u->ddir == DDIR_READ) {
86 open_flags = O_RDONLY;
87 } else if (io_u->ddir == DDIR_WRITE) {
88 open_flags = O_WRONLY;
90 log_err("hdfs: Invalid I/O Operation\n");
93 hd->curr_file_id = f_id;
95 sprintf(fname, ".f%lu", f_id);
96 fi = hdfsGetPathInfo(hd->fs, fname);
97 if (fi->mSize >= hd->fsbs || io_u->ddir == DDIR_WRITE) {
98 /* file has enough data to read OR file is opened in write mode */
100 hdfsOpenFile(hd->fs, fname, open_flags, 0, 0,
106 /* file is empty, so try next file for reading */
107 f_id = (f_id + 1) % hd->fscount;
113 static int fio_io_end(struct thread_data *td, struct io_u *io_u, int ret)
115 if (ret != (int)io_u->xfer_buflen) {
117 io_u->resid = io_u->xfer_buflen - ret;
119 return FIO_Q_COMPLETED;
125 td_verror(td, io_u->error, "xfer");
127 return FIO_Q_COMPLETED;
130 static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u)
132 struct hdfsio_data *hd;
135 hd = td->io_ops->data;
137 if (io_u->ddir == DDIR_READ) {
139 hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
140 } else if (io_u->ddir == DDIR_WRITE) {
142 hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf,
145 log_err("hdfs: Invalid I/O Operation\n");
148 return fio_io_end(td, io_u, ret);
151 int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f)
153 struct hdfsio_data *hd;
155 hd = td->io_ops->data;
156 hd->fs = hdfsConnect(hd->host, hd->port);
157 hdfsSetWorkingDirectory(hd->fs, "/.perftest");
158 hd->fid_correction = (getpid() % hd->numjobs);
163 int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f)
165 struct hdfsio_data *hd;
167 hd = td->io_ops->data;
168 hdfsDisconnect(hd->fs);
173 static int fio_hdfsio_setup(struct thread_data *td)
175 struct hdfsio_data *hd;
177 static unsigned int numjobs = 1; /* atleast one job has to be there! */
178 numjobs = (td->o.numjobs > numjobs) ? td->o.numjobs : numjobs;
180 if (!td->io_ops->data) {
181 hd = malloc(sizeof(*hd));;
183 memset(hd, 0, sizeof(*hd));
184 td->io_ops->data = hd;
186 /* separate host and port from filename */
187 *(strchr(td->o.filename, ',')) = ' ';
188 sscanf(td->o.filename, "%s%d", hd->host, &(hd->port));
190 /* read fbs and fcount and based on that set f->real_file_size */
193 /* IMHO, this should be done here instead of fio_hdfsio_prep()
194 * but somehow calling it here doesn't seem to work,
195 * some problem with libhdfs that needs to be debugged */
196 hd->fs = hdfsConnect(hd->host, hd->port);
197 fio_hdfsio_setup_fs_params(hd);
198 hdfsDisconnect(hd->fs);
200 /* so, as an alternate, using environment variables */
201 if (getenv("FIO_HDFS_FCOUNT") && getenv("FIO_HDFS_BS")) {
202 hd->fscount = atol(getenv("FIO_HDFS_FCOUNT"));
203 hd->fsbs = atol(getenv("FIO_HDFS_BS"));
205 log_err("FIO_HDFS_FCOUNT and/or FIO_HDFS_BS not set.\n");
209 f->real_file_size = hd->fscount * hd->fsbs;
212 hd->curr_file_id = -1;
213 hd->numjobs = numjobs;
214 fio_file_set_size_known(f);
220 static struct ioengine_ops ioengine_hdfs = {
222 .version = FIO_IOOPS_VERSION,
223 .setup = fio_hdfsio_setup,
224 .prep = fio_hdfsio_prep,
225 .queue = fio_hdfsio_queue,
226 .open_file = fio_hdfsio_open_file,
227 .close_file = fio_hdfsio_close_file,
231 static void fio_init fio_hdfsio_register(void)
233 register_ioengine(&ioengine_hdfs);
236 static void fio_exit fio_hdfsio_unregister(void)
238 unregister_ioengine(&ioengine_hdfs);