773d46b47bfc28bd6dba373b27fed13a8b7c0324
[fio.git] / engines / libhdfs.c
1 /*
2  * libhdfs engine
3  *
4  * this engine helps perform read/write operations on hdfs cluster using
5  * libhdfs. hdfs doesnot support modification of data once file is created.
6  *
7  * so to mimic that create many files of small size (e.g 256k), and this
8  * engine select a file based on the offset generated by fio.
9  *
10  * thus, random reads and writes can also be achieved with this logic.
11  *
12  * NOTE: please set environment variables FIO_HDFS_BS and FIO_HDFS_FCOUNT
13  * to appropriate value to work this engine properly
14  *
15  */
16
17 #include <stdio.h>
18 #include <stdlib.h>
19 #include <unistd.h>
20 #include <sys/uio.h>
21 #include <errno.h>
22 #include <assert.h>
23
24 #include "../fio.h"
25
26 #include "hdfs.h"
27
28 struct hdfsio_data {
29         char host[256];
30         int port;
31         hdfsFS fs;
32         hdfsFile fp;
33         unsigned long fsbs;
34         unsigned long fscount;
35         unsigned long curr_file_id;
36         unsigned int numjobs;
37         unsigned int fid_correction;
38 };
39
40 static int fio_hdfsio_setup_fs_params(struct hdfsio_data *hd)
41 {
42         /* make sure that hdfsConnect is invoked before executing this function */
43         hdfsSetWorkingDirectory(hd->fs, "/.perftest");
44         hd->fp = hdfsOpenFile(hd->fs, ".fcount", O_RDONLY, 0, 0, 0);
45         if (hd->fp) {
46                 hdfsRead(hd->fs, hd->fp, &(hd->fscount), sizeof(hd->fscount));
47                 hdfsCloseFile(hd->fs, hd->fp);
48         }
49         hd->fp = hdfsOpenFile(hd->fs, ".fbs", O_RDONLY, 0, 0, 0);
50         if (hd->fp) {
51                 hdfsRead(hd->fs, hd->fp, &(hd->fsbs), sizeof(hd->fsbs));
52                 hdfsCloseFile(hd->fs, hd->fp);
53         }
54
55         return 0;
56 }
57
58 static int fio_hdfsio_prep(struct thread_data *td, struct io_u *io_u)
59 {
60         struct hdfsio_data *hd;
61         hdfsFileInfo *fi;
62         unsigned long f_id;
63         char fname[80];
64         int open_flags = 0;
65
66         hd = td->io_ops->data;
67
68         if (hd->curr_file_id == -1) {
69                 /* see comment in fio_hdfsio_setup() function */
70                 fio_hdfsio_setup_fs_params(hd);
71         }
72
73         /* find out file id based on the offset generated by fio */
74         f_id = (io_u->offset / hd->fsbs) + hd->fid_correction;
75
76         if (f_id == hd->curr_file_id) {
77                 /* file is already open */
78                 return 0;
79         }
80
81         if (hd->curr_file_id != -1) {
82                 hdfsCloseFile(hd->fs, hd->fp);
83         }
84
85         if (io_u->ddir == DDIR_READ) {
86                 open_flags = O_RDONLY;
87         } else if (io_u->ddir == DDIR_WRITE) {
88                 open_flags = O_WRONLY;
89         } else {
90                 printf("Invalid I/O Operation\n");
91         }
92
93         hd->curr_file_id = f_id;
94         do {
95                 sprintf(fname, ".f%lu", f_id);
96                 fi = hdfsGetPathInfo(hd->fs, fname);
97                 if (fi->mSize >= hd->fsbs || io_u->ddir == DDIR_WRITE) {
98                         /* file has enough data to read OR file is opened in write mode */
99                         hd->fp =
100                             hdfsOpenFile(hd->fs, fname, open_flags, 0, 0,
101                                          hd->fsbs);
102                         if (hd->fp) {
103                                 break;
104                         }
105                 }
106                 /* file is empty, so try next file for reading */
107                 f_id = (f_id + 1) % hd->fscount;
108         } while (1);
109
110         return 0;
111 }
112
113 static int fio_io_end(struct thread_data *td, struct io_u *io_u, int ret)
114 {
115         if (ret != (int)io_u->xfer_buflen) {
116                 if (ret >= 0) {
117                         io_u->resid = io_u->xfer_buflen - ret;
118                         io_u->error = 0;
119                         return FIO_Q_COMPLETED;
120                 } else
121                         io_u->error = errno;
122         }
123
124         if (io_u->error)
125                 td_verror(td, io_u->error, "xfer");
126
127         return FIO_Q_COMPLETED;
128 }
129
130 static int fio_hdfsio_queue(struct thread_data *td, struct io_u *io_u)
131 {
132         struct hdfsio_data *hd;
133         int ret = 0;
134
135         hd = td->io_ops->data;
136
137         if (io_u->ddir == DDIR_READ) {
138                 ret =
139                     hdfsRead(hd->fs, hd->fp, io_u->xfer_buf, io_u->xfer_buflen);
140         } else if (io_u->ddir == DDIR_WRITE) {
141                 ret =
142                     hdfsWrite(hd->fs, hd->fp, io_u->xfer_buf,
143                               io_u->xfer_buflen);
144         } else {
145                 printf("Invalid I/O Operation\n");
146         }
147
148         return fio_io_end(td, io_u, ret);
149 }
150
151 int fio_hdfsio_open_file(struct thread_data *td, struct fio_file *f)
152 {
153         struct hdfsio_data *hd;
154
155         hd = td->io_ops->data;
156         hd->fs = hdfsConnect(hd->host, hd->port);
157         hdfsSetWorkingDirectory(hd->fs, "/.perftest");
158         hd->fid_correction = (getpid() % hd->numjobs);
159
160         return 0;
161 }
162
163 int fio_hdfsio_close_file(struct thread_data *td, struct fio_file *f)
164 {
165         struct hdfsio_data *hd;
166
167         hd = td->io_ops->data;
168         hdfsDisconnect(hd->fs);
169
170         return 0;
171 }
172
173 static int fio_hdfsio_setup(struct thread_data *td)
174 {
175         struct hdfsio_data *hd;
176         struct fio_file *f;
177         static unsigned int numjobs = 1;        /* atleast one job has to be there! */
178         numjobs = (td->o.numjobs > numjobs) ? td->o.numjobs : numjobs;
179
180         if (!td->io_ops->data) {
181                 hd = malloc(sizeof(*hd));;
182
183                 memset(hd, 0, sizeof(*hd));
184                 td->io_ops->data = hd;
185
186                 /* separate host and port from filename */
187                 *(strchr(td->o.filename, ',')) = ' ';
188                 sscanf(td->o.filename, "%s%d", hd->host, &(hd->port));
189
190                 /* read fbs and fcount and based on that set f->real_file_size */
191                 f = td->files[0];
192 #if 0
193                 /* IMHO, this should be done here instead of fio_hdfsio_prep()
194                  * but somehow calling it here doesn't seem to work,
195                  * some problem with libhdfs that needs to be debugged */
196                 hd->fs = hdfsConnect(hd->host, hd->port);
197                 fio_hdfsio_setup_fs_params(hd);
198                 hdfsDisconnect(hd->fs);
199 #else
200                 /* so, as an alternate, using environment variables */
201                 if (getenv("FIO_HDFS_FCOUNT") && getenv("FIO_HDFS_BS")) {
202                         hd->fscount = atol(getenv("FIO_HDFS_FCOUNT"));
203                         hd->fsbs = atol(getenv("FIO_HDFS_BS"));
204                 } else {
205                         fprintf(stderr,
206                                 "FIO_HDFS_FCOUNT and/or FIO_HDFS_BS not set.\n");
207                         return 1;
208                 }
209 #endif
210                 f->real_file_size = hd->fscount * hd->fsbs;
211
212                 td->o.nr_files = 1;
213                 hd->curr_file_id = -1;
214                 hd->numjobs = numjobs;
215                 fio_file_set_size_known(f);
216         }
217
218         return 0;
219 }
220
221 static struct ioengine_ops ioengine_hdfs = {
222         .name = "libhdfs",
223         .version = FIO_IOOPS_VERSION,
224         .setup = fio_hdfsio_setup,
225         .prep = fio_hdfsio_prep,
226         .queue = fio_hdfsio_queue,
227         .open_file = fio_hdfsio_open_file,
228         .close_file = fio_hdfsio_close_file,
229         .flags = FIO_SYNCIO,
230 };
231
232 static void fio_init fio_hdfsio_register(void)
233 {
234         register_ioengine(&ioengine_hdfs);
235 }
236
237 static void fio_exit fio_hdfsio_unregister(void)
238 {
239         unregister_ioengine(&ioengine_hdfs);
240 }