Support for 'netsplice' engine
[fio.git] / engines / net.c
1 /*
2  * net engine
3  *
4  * IO engine that reads/writes to/from sockets.
5  *
6  */
7 #include <stdio.h>
8 #include <stdlib.h>
9 #include <unistd.h>
10 #include <errno.h>
11 #include <assert.h>
12 #include <netinet/in.h>
13 #include <arpa/inet.h>
14 #include <netdb.h>
15 #include <sys/poll.h>
16
17 #include "../fio.h"
18
19 struct netio_data {
20         int listenfd;
21         int send_to_net;
22         int use_splice;
23         int pipes[2];
24         char host[64];
25         struct sockaddr_in addr;
26 };
27
28 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
29 {
30         struct netio_data *nd = td->io_ops->data;
31         struct fio_file *f = io_u->file;
32
33         /*
34          * Make sure we don't see spurious reads to a receiver, and vice versa
35          */
36         if ((nd->send_to_net && io_u->ddir == DDIR_READ) ||
37             (!nd->send_to_net && io_u->ddir == DDIR_WRITE)) {
38                 td_verror(td, EINVAL, "bad direction");
39                 return 1;
40         }
41                 
42         if (io_u->ddir == DDIR_SYNC)
43                 return 0;
44         if (io_u->offset == f->last_completed_pos)
45                 return 0;
46
47         /*
48          * If offset is different from last end position, it's a seek.
49          * As network io is purely sequential, we don't allow seeks.
50          */
51         td_verror(td, EINVAL, "cannot seek");
52         return 1;
53 }
54
55 /*
56  * Receive bytes from a socket and fill them into the internal pipe
57  */
58 static int splice_in(struct thread_data *td, struct io_u *io_u)
59 {
60         struct netio_data *nd = td->io_ops->data;
61         unsigned int len = io_u->xfer_buflen;
62         struct fio_file *f = io_u->file;
63         int bytes = 0;
64
65         while (len) {
66                 int ret = splice(nd->pipes[1], NULL, f->fd, NULL, len, 0);
67
68                 if (ret < 0) {
69                         if (!bytes)
70                                 bytes = ret;
71
72                         break;
73                 } else if (!ret)
74                         break;
75
76                 bytes += ret;
77         }
78
79         return bytes;
80 }
81
82 /*
83  * Transmit 'len' bytes from the internal pipe
84  */
85 static int splice_out(struct thread_data *td, struct io_u *io_u,
86                       unsigned int len)
87 {
88         struct netio_data *nd = td->io_ops->data;
89         struct fio_file *f = io_u->file;
90         int bytes = 0;
91
92         while (len) {
93                 int ret = splice(nd->pipes[0], NULL, f->fd, NULL, len, 0);
94
95                 if (ret < 0) {
96                         if (!bytes)
97                                 bytes = ret;
98                         
99                         break;
100                 } else if (!ret)
101                         break;
102
103                 bytes += ret;
104                 len -= ret;
105         }
106
107         return bytes;
108 }
109
110 /*
111  * vmsplice() pipe to io_u buffer
112  */
113 static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u,
114                              unsigned int len)
115 {
116         struct netio_data *nd = td->io_ops->data;
117         struct iovec iov = {
118                 .iov_base = io_u->xfer_buf,
119                 .iov_len = len,
120         };
121         int bytes = 0;
122
123         while (iov.iov_len) {
124                 int ret = vmsplice(nd->pipes[0], &iov, 1, 0);
125
126                 if (ret < 0) {
127                         if (!bytes)
128                                 bytes = ret;
129                         break;
130                 } else if (!ret)
131                         break;
132
133                 iov.iov_len -= ret;
134                 if (iov.iov_len)
135                         iov.iov_base += ret;
136         }
137
138         return bytes;
139 }
140
141 /*
142  * vmsplice() io_u to pipe
143  */
144 static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u)
145 {
146         struct netio_data *nd = td->io_ops->data;
147         struct iovec iov = {
148                 .iov_base = io_u->xfer_buf,
149                 .iov_len = io_u->xfer_buflen,
150         };
151         unsigned int bytes = 0;
152
153         while (iov.iov_len) {
154                 int ret = vmsplice(nd->pipes[1], &iov, 1, 0);
155
156                 if (ret < 0)
157                         return -1;
158                 else if (!ret)
159                         return bytes;
160
161                 iov.iov_len -= ret;
162                 bytes += ret;
163                 if (iov.iov_len)
164                         iov.iov_base += ret;
165         }
166
167         return bytes;
168 }
169
170 static int fio_netio_splice_in(struct thread_data *td, struct io_u *io_u)
171 {
172         int ret;
173
174         ret = splice_in(td, io_u);
175         if (ret <= 0)
176                 return ret;
177
178         return vmsplice_io_u_out(td, io_u, ret);
179 }
180
181 static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u)
182 {
183         int ret;
184
185         ret = vmsplice_io_u_in(td, io_u);
186         if (ret <= 0)
187                 return ret;
188
189         return splice_out(td, io_u, ret);
190 }
191
192 static int fio_netio_send(struct thread_data *td, struct io_u *io_u)
193 {
194         int flags = 0;
195
196         /*
197          * if we are going to write more, set MSG_MORE
198          */
199         if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen < td->o.size)
200                 flags = MSG_MORE;
201
202         return send(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
203 }
204
205 static int fio_netio_recv(struct io_u *io_u)
206 {
207         int flags = MSG_WAITALL;
208
209         return recv(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
210 }
211
212 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
213 {
214         struct netio_data *nd = td->io_ops->data;
215         int ret;
216
217         if (io_u->ddir == DDIR_WRITE) {
218                 if (nd->use_splice)
219                         ret = fio_netio_splice_out(td, io_u);
220                 else
221                         ret = fio_netio_send(td, io_u);
222         } else if (io_u->ddir == DDIR_READ) {
223                 if (nd->use_splice)
224                         ret = fio_netio_splice_in(td, io_u);
225                 else
226                         ret = fio_netio_recv(io_u);
227         } else
228                 ret = 0;        /* must be a SYNC */
229
230         if (ret != (int) io_u->xfer_buflen) {
231                 if (ret >= 0) {
232                         io_u->resid = io_u->xfer_buflen - ret;
233                         io_u->error = 0;
234                         return FIO_Q_COMPLETED;
235                 } else
236                         io_u->error = errno;
237         }
238
239         if (io_u->error)
240                 td_verror(td, io_u->error, "xfer");
241
242         return FIO_Q_COMPLETED;
243 }
244
245 static int fio_netio_connect(struct thread_data *td, struct fio_file *f)
246 {
247         struct netio_data *nd = td->io_ops->data;
248
249         f->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
250         if (f->fd < 0) {
251                 td_verror(td, errno, "socket");
252                 return 1;
253         }
254
255         if (connect(f->fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
256                 td_verror(td, errno, "connect");
257                 return 1;
258         }
259
260         return 0;
261 }
262
263 static int fio_netio_accept(struct thread_data *td, struct fio_file *f)
264 {
265         struct netio_data *nd = td->io_ops->data;
266         socklen_t socklen = sizeof(nd->addr);
267         struct pollfd pfd;
268         int ret;
269
270         log_info("fio: waiting for connection\n");
271
272         /*
273          * Accept loop. poll for incoming events, accept them. Repeat until we
274          * have all connections.
275          */
276         while (!td->terminate) {
277                 pfd.fd = nd->listenfd;
278                 pfd.events = POLLIN;
279
280                 ret = poll(&pfd, 1, -1);
281                 if (ret < 0) {
282                         if (errno == EINTR)
283                                 continue;
284
285                         td_verror(td, errno, "poll");
286                         break;
287                 } else if (!ret)
288                         continue;
289
290                 /*
291                  * should be impossible
292                  */
293                 if (!(pfd.revents & POLLIN))
294                         continue;
295
296                 f->fd = accept(nd->listenfd, (struct sockaddr *) &nd->addr, &socklen);
297                 if (f->fd < 0) {
298                         td_verror(td, errno, "accept");
299                         return 1;
300                 }
301                 break;
302         }
303
304         return 0;
305 }
306
307
308 static int fio_netio_open_file(struct thread_data *td, struct fio_file *f)
309 {
310         if (td_read(td))
311                 return fio_netio_accept(td, f);
312         else
313                 return fio_netio_connect(td, f);
314 }
315
316 static int fio_netio_setup_connect(struct thread_data *td, const char *host,
317                                    unsigned short port)
318 {
319         struct netio_data *nd = td->io_ops->data;
320
321         nd->addr.sin_family = AF_INET;
322         nd->addr.sin_port = htons(port);
323
324         if (inet_aton(host, &nd->addr.sin_addr) != 1) {
325                 struct hostent *hent;
326
327                 hent = gethostbyname(host);
328                 if (!hent) {
329                         td_verror(td, errno, "gethostbyname");
330                         return 1;
331                 }
332
333                 memcpy(&nd->addr.sin_addr, hent->h_addr, 4);
334         }
335
336         return 0;
337 }
338
339 static int fio_netio_setup_listen(struct thread_data *td, short port)
340 {
341         struct netio_data *nd = td->io_ops->data;
342         int fd, opt;
343
344         fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
345         if (fd < 0) {
346                 td_verror(td, errno, "socket");
347                 return 1;
348         }
349
350         opt = 1;
351         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
352                 td_verror(td, errno, "setsockopt");
353                 return 1;
354         }
355 #ifdef SO_REUSEPORT
356         if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
357                 td_verror(td, errno, "setsockopt");
358                 return 1;
359         }
360 #endif
361
362         nd->addr.sin_family = AF_INET;
363         nd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
364         nd->addr.sin_port = htons(port);
365
366         if (bind(fd, (struct sockaddr *) &nd->addr, sizeof(nd->addr)) < 0) {
367                 td_verror(td, errno, "bind");
368                 return 1;
369         }
370         if (listen(fd, 1) < 0) {
371                 td_verror(td, errno, "listen");
372                 return 1;
373         }
374
375         nd->listenfd = fd;
376         return 0;
377 }
378
379 static int fio_netio_init(struct thread_data *td)
380 {
381         struct netio_data *nd = td->io_ops->data;
382         unsigned short port;
383         char host[64], buf[128];
384         char *sep;
385         int ret;
386
387         if (td_rw(td)) {
388                 log_err("fio: network connections must be read OR write\n");
389                 return 1;
390         }
391         if (td_random(td)) {
392                 log_err("fio: network IO can't be random\n");
393                 return 1;
394         }
395
396         strcpy(buf, td->o.filename);
397
398         sep = strchr(buf, '/');
399         if (!sep) {
400                 log_err("fio: bad network host/port <<%s>>\n", td->o.filename);
401                 return 1;
402         }
403
404         *sep = '\0';
405         sep++;
406         strcpy(host, buf);
407         port = atoi(sep);
408
409         if (td_read(td)) {
410                 nd->send_to_net = 0;
411                 ret = fio_netio_setup_listen(td, port);
412         } else {
413                 nd->send_to_net = 1;
414                 ret = fio_netio_setup_connect(td, host, port);
415         }
416
417         return ret;
418 }
419
420 static void fio_netio_cleanup(struct thread_data *td)
421 {
422         struct netio_data *nd = td->io_ops->data;
423
424         if (nd) {
425                 free(nd);
426                 td->io_ops->data = NULL;
427         }
428 }
429
430 static int fio_netio_setup(struct thread_data *td)
431 {
432         struct netio_data *nd;
433
434         if (!td->io_ops->data) {
435                 nd = malloc(sizeof(*nd));;
436
437                 memset(nd, 0, sizeof(*nd));
438                 nd->listenfd = -1;
439                 td->io_ops->data = nd;
440         }
441
442         return 0;
443 }
444
445 static int fio_netio_setup_splice(struct thread_data *td)
446 {
447         struct netio_data *nd;
448
449         fio_netio_setup(td);
450
451         nd = td->io_ops->data;
452         if (nd) {
453                 if (pipe(nd->pipes) < 0)
454                         return 1;
455
456                 nd->use_splice = 1;
457                 return 0;
458         }
459
460         return 1;
461 }
462
463 static struct ioengine_ops ioengine_rw = {
464         .name           = "net",
465         .version        = FIO_IOOPS_VERSION,
466         .prep           = fio_netio_prep,
467         .queue          = fio_netio_queue,
468         .setup          = fio_netio_setup,
469         .init           = fio_netio_init,
470         .cleanup        = fio_netio_cleanup,
471         .open_file      = fio_netio_open_file,
472         .close_file     = generic_close_file,
473         .flags          = FIO_SYNCIO | FIO_DISKLESSIO,
474 };
475
476 static struct ioengine_ops ioengine_splice = {
477         .name           = "netsplice",
478         .version        = FIO_IOOPS_VERSION,
479         .prep           = fio_netio_prep,
480         .queue          = fio_netio_queue,
481         .setup          = fio_netio_setup_splice,
482         .init           = fio_netio_init,
483         .cleanup        = fio_netio_cleanup,
484         .open_file      = fio_netio_open_file,
485         .close_file     = generic_close_file,
486         .flags          = FIO_SYNCIO | FIO_DISKLESSIO,
487 };
488
489 static void fio_init fio_netio_register(void)
490 {
491         register_ioengine(&ioengine_rw);
492         register_ioengine(&ioengine_splice);
493 }
494
495 static void fio_exit fio_netio_unregister(void)
496 {
497         unregister_ioengine(&ioengine_rw);
498         unregister_ioengine(&ioengine_splice);
499 }