[PATCH] SG io engine: better handling of multiple files
[fio.git] / engines / net.c
1 /*
2  * Transfer data over the net.
3  */
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <unistd.h>
7 #include <errno.h>
8 #include <assert.h>
9 #include <netinet/in.h>
10 #include <arpa/inet.h>
11 #include <netdb.h>
12
13 #include "../fio.h"
14 #include "../os.h"
15
16 struct net_data {
17         int send_to_net;
18         struct io_u *last_io_u;
19 };
20
21 static int fio_netio_getevents(struct thread_data *td, int fio_unused min,
22                                 int max, struct timespec fio_unused *t)
23 {
24         assert(max <= 1);
25
26         /*
27          * we can only have one finished io_u for sync io, since the depth
28          * is always 1
29          */
30         if (list_empty(&td->io_u_busylist))
31                 return 0;
32
33         return 1;
34 }
35
36 static struct io_u *fio_netio_event(struct thread_data *td, int event)
37 {
38         struct net_data *nd = td->io_ops->data;
39
40         assert(event == 0);
41
42         return nd->last_io_u;
43 }
44
45 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
46 {
47         struct net_data *nd = td->io_ops->data;
48         struct fio_file *f = io_u->file;
49
50         /*
51          * Make sure we don't see spurious reads to a receiver, and vice versa
52          */
53         if ((nd->send_to_net && io_u->ddir == DDIR_READ) ||
54             (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) {
55                 printf("boo!\n");
56                 td_verror(td, EINVAL);
57                 return 1;
58         }
59                 
60         if (io_u->ddir == DDIR_SYNC)
61                 return 0;
62         if (io_u->offset == f->last_completed_pos)
63                 return 0;
64
65         /*
66          * If offset is different from last end position, it's a seek.
67          * As network io is purely sequential, we don't allow seeks.
68          */
69         td_verror(td, EINVAL);
70         return 1;
71 }
72
73 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
74 {
75         struct net_data *nd = td->io_ops->data;
76         struct fio_file *f = io_u->file;
77         int ret, flags = 0;
78
79         if (io_u->ddir == DDIR_WRITE) {
80                 /*
81                  * if we are going to write more, set MSG_MORE
82                  */
83                 if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
84                     td->io_size)
85                         flags = MSG_MORE;
86
87                 ret = send(f->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
88         } else if (io_u->ddir == DDIR_READ) {
89                 flags = MSG_WAITALL;
90                 ret = recv(f->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
91         } else
92                 ret = 0;        /* must be a SYNC */
93
94         if (ret != (int) io_u->xfer_buflen) {
95                 if (ret > 0) {
96                         io_u->resid = io_u->xfer_buflen - ret;
97                         io_u->error = 0;
98                         return ret;
99                 } else
100                         io_u->error = errno;
101         }
102
103         if (!io_u->error)
104                 nd->last_io_u = io_u;
105
106         return io_u->error;
107 }
108
109 static int fio_netio_setup_connect(struct thread_data *td, const char *host,
110                                    unsigned short port)
111 {
112         struct sockaddr_in addr;
113         struct fio_file *f;
114         int i;
115
116         memset(&addr, 0, sizeof(addr));
117         addr.sin_family = AF_INET;
118         addr.sin_port = htons(port);
119
120         if (inet_aton(host, &addr.sin_addr) != 1) {
121                 struct hostent *hent;
122
123                 hent = gethostbyname(host);
124                 if (!hent) {
125                         td_verror(td, errno);
126                         return 1;
127                 }
128
129                 memcpy(&addr.sin_addr, hent->h_addr, 4);
130         }
131
132         for_each_file(td, f, i) {
133                 f->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
134                 if (f->fd < 0) {
135                         td_verror(td, errno);
136                         return 1;
137                 }
138
139                 if (connect(f->fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
140                         td_verror(td, errno);
141                         return 1;
142                 }
143         }
144
145         return 0;
146
147 }
148
149 static int fio_netio_setup_listen(struct thread_data *td, unsigned short port)
150 {
151         struct sockaddr_in addr;
152         socklen_t socklen;
153         struct fio_file *f;
154         int fd, opt, i;
155
156         fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
157         if (fd < 0) {
158                 td_verror(td, errno);
159                 return 1;
160         }
161
162         opt = 1;
163         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
164                 td_verror(td, errno);
165                 return 1;
166         }
167 #ifdef SO_REUSEPORT
168         if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
169                 td_verror(td, errno);
170                 return 1;
171         }
172 #endif
173
174         memset(&addr, 0, sizeof(addr));
175         addr.sin_family = AF_INET;
176         addr.sin_addr.s_addr = htonl(INADDR_ANY);
177         addr.sin_port = htons(port);
178
179         if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
180                 td_verror(td, errno);
181                 return 1;
182         }
183         if (listen(fd, 1) < 0) {
184                 td_verror(td, errno);
185                 return 1;
186         }
187
188         fprintf(f_out, "fio: waiting for %u connections\n", td->nr_files);
189
190         socklen = sizeof(addr);
191         for_each_file(td, f, i) {
192                 f->fd = accept(fd, (struct sockaddr *) &addr, &socklen);
193                 if (f->fd < 0) {
194                         td_verror(td, errno);
195                         return 1;
196                 }
197         }
198
199         return 0;
200 }
201
202 static int fio_netio_setup(struct thread_data *td)
203 {
204         char host[64], buf[128];
205         struct net_data *nd;
206         unsigned short port;
207         struct fio_file *f;
208         char *sep;
209         int ret, i;
210
211         if (!td->total_file_size) {
212                 log_err("fio: need size= set\n");
213                 return 1;
214         }
215
216         /*
217          * work around for late init call
218          */
219         if (td->io_ops->init(td))
220                 return 1;
221
222         nd = td->io_ops->data;
223
224         if (td->iomix) {
225                 log_err("fio: network connections must be read OR write\n");
226                 return 1;
227         }
228
229         strcpy(buf, td->filename);
230
231         sep = strchr(buf, ':');
232         if (!sep) {
233                 log_err("fio: bad network host:port <<%s>>\n", td->filename);
234                 return 1;
235         }
236
237         *sep = '\0';
238         sep++;
239         strcpy(host, buf);
240         port = atoi(sep);
241
242         if (td->ddir == READ) {
243                 nd->send_to_net = 0;
244                 ret = fio_netio_setup_listen(td, port);
245         } else {
246                 nd->send_to_net = 1;
247                 ret = fio_netio_setup_connect(td, host, port);
248         }
249
250         if (ret)
251                 return ret;
252
253         td->io_size = td->total_file_size;
254         td->total_io_size = td->io_size;
255
256         for_each_file(td, f, i) {
257                 f->file_size = td->total_file_size / td->nr_files;
258                 f->real_file_size = f->file_size;
259         }
260
261         return 0;
262 }
263
264 static void fio_netio_cleanup(struct thread_data *td)
265 {
266         if (td->io_ops->data) {
267                 free(td->io_ops->data);
268                 td->io_ops->data = NULL;
269         }
270 }
271
272 static int fio_netio_init(struct thread_data *td)
273 {
274         struct net_data *nd;
275
276         /*
277          * Hack to work-around the ->setup() function calling init on its
278          * own, since it needs ->io_ops->data to be set up.
279          */
280         if (td->io_ops->data)
281                 return 0;
282
283         nd  = malloc(sizeof(*nd));
284         nd->last_io_u = NULL;
285         td->io_ops->data = nd;
286         return 0;
287 }
288
289 static struct ioengine_ops ioengine = {
290         .name           = "net",
291         .version        = FIO_IOOPS_VERSION,
292         .init           = fio_netio_init,
293         .prep           = fio_netio_prep,
294         .queue          = fio_netio_queue,
295         .getevents      = fio_netio_getevents,
296         .event          = fio_netio_event,
297         .cleanup        = fio_netio_cleanup,
298         .setup          = fio_netio_setup,
299         .flags          = FIO_SYNCIO | FIO_NETIO,
300 };
301
302 static void fio_init fio_netio_register(void)
303 {
304         register_ioengine(&ioengine);
305 }
306
307 static void fio_exit fio_netio_unregister(void)
308 {
309         unregister_ioengine(&ioengine);
310 }