Add the file sharing bits
[fio.git] / engines / net.c
1 /*
2  * net engine
3  *
4  * IO engine that reads/writes to/from sockets.
5  *
6  */
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <errno.h>
11 #include <assert.h>
12 #include <netinet/in.h>
13 #include <arpa/inet.h>
14 #include <netdb.h>
15 #include <sys/poll.h>
16
17 #include "../fio.h"
18
19 struct netio_data {
20         int listenfd;
21         int send_to_net;
22         int use_splice;
23         int pipes[2];
24         char host[64];
25         struct sockaddr_in addr;
26 };
27
28 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
29 {
30         struct netio_data *nd = td->io_ops->data;
31
32         /*
33          * Make sure we don't see spurious reads to a receiver, and vice versa
34          */
35         if ((nd->send_to_net && io_u->ddir == DDIR_READ) ||
36             (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) {
37                 td_verror(td, EINVAL, "bad direction");
38                 return 1;
39         }
40                 
41         return 0;
42 }
43
44 static int splice_io_u(int fdin, int fdout, unsigned int len)
45 {
46         int bytes = 0;
47
48         while (len) {
49                 int ret = splice(fdin, NULL, fdout, NULL, len, 0);
50
51                 if (ret < 0) {
52                         if (!bytes)
53                                 bytes = ret;
54
55                         break;
56                 } else if (!ret)
57                         break;
58
59                 bytes += ret;
60                 len -= ret;
61         }
62
63         return bytes;
64 }
65
66 /*
67  * Receive bytes from a socket and fill them into the internal pipe
68  */
69 static int splice_in(struct thread_data *td, struct io_u *io_u)
70 {
71         struct netio_data *nd = td->io_ops->data;
72
73         return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen);
74 }
75
76 /*
77  * Transmit 'len' bytes from the internal pipe
78  */
79 static int splice_out(struct thread_data *td, struct io_u *io_u,
80                       unsigned int len)
81 {
82         struct netio_data *nd = td->io_ops->data;
83
84         return splice_io_u(nd->pipes[0], io_u->file->fd, len);
85 }
86
87 static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len)
88 {
89         struct iovec iov = {
90                 .iov_base = io_u->xfer_buf,
91                 .iov_len = len,
92         };
93         int bytes = 0;
94
95         while (iov.iov_len) {
96                 int ret = vmsplice(fd, &iov, 1, SPLICE_F_MOVE);
97
98                 if (ret < 0) {
99                         if (!bytes)
100                                 bytes = ret;
101                         break;
102                 } else if (!ret)
103                         break;
104
105                 iov.iov_len -= ret;
106                 iov.iov_base += ret;
107                 bytes += ret;
108         }
109
110         return bytes;
111
112 }
113
114 /*
115  * vmsplice() pipe to io_u buffer
116  */
117 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
118                              unsigned int len)
119 {
120         struct netio_data *nd = td->io_ops->data;
121
122         return vmsplice_io_u(io_u, nd->pipes[0], len);
123 }
124
125 /*
126  * vmsplice() io_u to pipe
127  */
128 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
129 {
130         struct netio_data *nd = td->io_ops->data;
131
132         return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen);
133 }
134
135 /*
136  * splice receive - transfer socket data into a pipe using splice, then map
137  * that pipe data into the io_u using vmsplice.
138  */
139 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
140 {
141         int ret;
142
143         ret = splice_in(td, io_u);
144         if (ret > 0)
145                 return vmsplice_io_u_out(td, io_u, ret);
146
147         return ret;
148 }
149
150 /*
151  * splice transmit - map data from the io_u into a pipe by using vmsplice,
152  * then transfer that pipe to a socket using splice.
153  */
154 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
155 {
156         int ret;
157
158         ret = vmsplice_io_u_in(td, io_u);
159         if (ret > 0)
160                 return splice_out(td, io_u, ret);
161
162         return ret;
163 }
164
165 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
166 {
167         int flags = 0;
168
169         /*
170          * if we are going to write more, set MSG_MORE
171          */
172         if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen < td->o.size)
173                 flags = MSG_MORE;
174
175         return send(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
176 }
177
178 static int fio_netio_recv(struct io_u *io_u)
179 {
180         int flags = MSG_WAITALL;
181
182         return recv(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
183 }
184
185 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
186 {
187         struct netio_data *nd = td->io_ops->data;
188         int ret;
189
190         fio_ro_check(td, io_u);
191
192         if (io_u->ddir == DDIR_WRITE) {
193                 if (nd->use_splice)
194                         ret = fio_netio_splice_out(td, io_u);
195                 else
196                         ret = fio_netio_send(td, io_u);
197         } else if (io_u->ddir == DDIR_READ) {
198                 if (nd->use_splice)
199                         ret = fio_netio_splice_in(td, io_u);
200                 else
201                         ret = fio_netio_recv(io_u);
202         } else
203                 ret = 0;        /* must be a SYNC */
204
205         if (ret != (int) io_u->xfer_buflen) {
206                 if (ret >= 0) {
207                         io_u->resid = io_u->xfer_buflen - ret;
208                         io_u->error = 0;
209                         return FIO_Q_COMPLETED;
210                 } else
211                         io_u->error = errno;
212         }
213
214         if (io_u->error)
215                 td_verror(td, io_u->error, "xfer");
216
217         return FIO_Q_COMPLETED;
218 }
219
220 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
221 {
222         struct netio_data *nd = td->io_ops->data;
223
224         f->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
225         if (f->fd < 0) {
226                 td_verror(td, errno, "socket");
227                 return 1;
228         }
229
230         if (connect(f->fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
231                 td_verror(td, errno, "connect");
232                 return 1;
233         }
234
235         return 0;
236 }
237
238 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
239 {
240         struct netio_data *nd = td->io_ops->data;
241         socklen_t socklen = sizeof(nd->addr);
242         struct pollfd pfd;
243         int ret;
244
245         log_info("fio: waiting for connection\n");
246
247         /*
248          * Accept loop. poll for incoming events, accept them. Repeat until we
249          * have all connections.
250          */
251         while (!td->terminate) {
252                 pfd.fd = nd->listenfd;
253                 pfd.events = POLLIN;
254
255                 ret = poll(&pfd, 1, -1);
256                 printf("got ret %d\n", ret);
257                 if (ret < 0) {
258                         if (errno == EINTR)
259                                 continue;
260
261                         td_verror(td, errno, "poll");
262                         break;
263                 } else if (!ret)
264                         continue;
265
266                 /*
267                  * should be impossible
268                  */
269                 if (!(pfd.revents & POLLIN))
270                         continue;
271
272                 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
273                 if (f->fd < 0) {
274                         td_verror(td, errno, "accept");
275                         return 1;
276                 }
277                 break;
278         }
279
280         return 0;
281 }
282
283 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
284 {
285         if (td_read(td))
286                 return fio_netio_accept(td, f);
287         else
288                 return fio_netio_connect(td, f);
289 }
290
291 static int fio_netio_setup_connect(struct thread_data *td, const char *host,
292                                    unsigned short port)
293 {
294         struct netio_data *nd = td->io_ops->data;
295
296         nd->addr.sin_family = AF_INET;
297         nd->addr.sin_port = htons(port);
298
299         if (inet_aton(host, &nd->addr.sin_addr) != 1) {
300                 struct hostent *hent;
301
302                 hent = gethostbyname(host);
303                 if (!hent) {
304                         td_verror(td, errno, "gethostbyname");
305                         return 1;
306                 }
307
308                 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
309         }
310
311         return 0;
312 }
313
314 static int fio_netio_setup_listen(struct thread_data *td, short port)
315 {
316         struct netio_data *nd = td->io_ops->data;
317         int fd, opt;
318
319         fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
320         if (fd < 0) {
321                 td_verror(td, errno, "socket");
322                 return 1;
323         }
324
325         opt = 1;
326         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
327                 td_verror(td, errno, "setsockopt");
328                 return 1;
329         }
330 #ifdef SO_REUSEPORT
331         if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
332                 td_verror(td, errno, "setsockopt");
333                 return 1;
334         }
335 #endif
336
337         nd->addr.sin_family = AF_INET;
338         nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
339         nd->addr.sin_port = htons(port);
340
341         if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
342                 td_verror(td, errno, "bind");
343                 return 1;
344         }
345         if (listen(fd, 1) < 0) {
346                 td_verror(td, errno, "listen");
347                 return 1;
348         }
349
350         nd->listenfd = fd;
351         return 0;
352 }
353
354 static int fio_netio_init(struct thread_data *td)
355 {
356         struct netio_data *nd = td->io_ops->data;
357         unsigned short port;
358         char host[64], buf[128];
359         char *sep;
360         int ret;
361
362         if (td_rw(td)) {
363                 log_err("fio: network connections must be read OR write\n");
364                 return 1;
365         }
366         if (td_random(td)) {
367                 log_err("fio: network IO can't be random\n");
368                 return 1;
369         }
370
371         strcpy(buf, td->o.filename);
372
373         sep = strchr(buf, '/');
374         if (!sep) {
375                 log_err("fio: bad network host/port <<%s>>\n", td->o.filename);
376                 return 1;
377         }
378
379         *sep = '\0';
380         sep++;
381         strcpy(host, buf);
382         port = atoi(sep);
383
384         if (td_read(td)) {
385                 nd->send_to_net = 0;
386                 ret = fio_netio_setup_listen(td, port);
387         } else {
388                 nd->send_to_net = 1;
389                 ret = fio_netio_setup_connect(td, host, port);
390         }
391
392         return ret;
393 }
394
395 static void fio_netio_cleanup(struct thread_data *td)
396 {
397         struct netio_data *nd = td->io_ops->data;
398
399         if (nd) {
400                 if (nd->listenfd != -1)
401                         close(nd->listenfd);
402                 if (nd->pipes[0] != -1)
403                         close(nd->pipes[0]);
404                 if (nd->pipes[1] != -1)
405                         close(nd->pipes[1]);
406
407                 free(nd);
408                 td->io_ops->data = NULL;
409         }
410 }
411
412 static int fio_netio_setup(struct thread_data *td)
413 {
414         struct netio_data *nd;
415
416         if (!td->io_ops->data) {
417                 nd = malloc(sizeof(*nd));;
418
419                 memset(nd, 0, sizeof(*nd));
420                 nd->listenfd = -1;
421                 nd->pipes[0] = nd->pipes[1] = -1;
422                 td->io_ops->data = nd;
423         }
424
425         return 0;
426 }
427
428 static int fio_netio_setup_splice(struct thread_data *td)
429 {
430         struct netio_data *nd;
431
432         fio_netio_setup(td);
433
434         nd = td->io_ops->data;
435         if (nd) {
436                 if (pipe(nd->pipes) < 0)
437                         return 1;
438
439                 nd->use_splice = 1;
440                 return 0;
441         }
442
443         return 1;
444 }
445
446 static struct ioengine_ops ioengine_rw = {
447         .name           = "net",
448         .version        = FIO_IOOPS_VERSION,
449         .prep           = fio_netio_prep,
450         .queue          = fio_netio_queue,
451         .setup          = fio_netio_setup,
452         .init           = fio_netio_init,
453         .cleanup        = fio_netio_cleanup,
454         .open_file      = fio_netio_open_file,
455         .close_file     = generic_close_file,
456         .flags          = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
457                           FIO_SIGQUIT,
458 };
459
460 static struct ioengine_ops ioengine_splice = {
461         .name           = "netsplice",
462         .version        = FIO_IOOPS_VERSION,
463         .prep           = fio_netio_prep,
464         .queue          = fio_netio_queue,
465         .setup          = fio_netio_setup_splice,
466         .init           = fio_netio_init,
467         .cleanup        = fio_netio_cleanup,
468         .open_file      = fio_netio_open_file,
469         .close_file     = generic_close_file,
470         .flags          = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR |
471                           FIO_SIGQUIT,
472 };
473
474 static void fio_init fio_netio_register(void)
475 {
476         register_ioengine(&ioengine_rw);
477         register_ioengine(&ioengine_splice);
478 }
479
480 static void fio_exit fio_netio_unregister(void)
481 {
482         unregister_ioengine(&ioengine_rw);
483         unregister_ioengine(&ioengine_splice);
484 }