2381f7390ea3d0d9934599409ce6ddbadb4ab5b1
[fio.git] / engines / net.c
1 /*
2  * Transfer data over the net.
3  */
4 #include <stdio.h>
5 #include <stdlib.h>
6 #include <unistd.h>
7 #include <errno.h>
8 #include <assert.h>
9 #include <netinet/in.h>
10 #include <arpa/inet.h>
11 #include <netdb.h>
12 #include <sys/poll.h>
13
14 #include "../fio.h"
15 #include "../os.h"
16
17 #define send_to_net(td) ((td)->io_ops->priv)
18
19 static int fio_netio_prep(struct thread_data *td, struct io_u *io_u)
20 {
21         struct fio_file *f = io_u->file;
22
23         /*
24          * Make sure we don't see spurious reads to a receiver, and vice versa
25          */
26         if ((send_to_net(td) && io_u->ddir == DDIR_READ) ||
27             (!send_to_net(td) && io_u->ddir == DDIR_WRITE)) {
28                 td_verror(td, EINVAL, "bad direction");
29                 return 1;
30         }
31                 
32         if (io_u->ddir == DDIR_SYNC)
33                 return 0;
34         if (io_u->offset == f->last_completed_pos)
35                 return 0;
36
37         /*
38          * If offset is different from last end position, it's a seek.
39          * As network io is purely sequential, we don't allow seeks.
40          */
41         td_verror(td, EINVAL, "cannot seek");
42         return 1;
43 }
44
45 static int fio_netio_queue(struct thread_data *td, struct io_u *io_u)
46 {
47         struct fio_file *f = io_u->file;
48         int ret, flags = 0;
49
50         if (io_u->ddir == DDIR_WRITE) {
51                 /*
52                  * if we are going to write more, set MSG_MORE
53                  */
54                 if (td->this_io_bytes[DDIR_WRITE] + io_u->xfer_buflen <
55                     td->io_size)
56                         flags = MSG_MORE;
57
58                 ret = send(f->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
59         } else if (io_u->ddir == DDIR_READ) {
60                 flags = MSG_WAITALL;
61                 ret = recv(f->fd, io_u->xfer_buf, io_u->xfer_buflen, flags);
62         } else
63                 ret = 0;        /* must be a SYNC */
64
65         if (ret != (int) io_u->xfer_buflen) {
66                 if (ret >= 0) {
67                         io_u->resid = io_u->xfer_buflen - ret;
68                         io_u->error = 0;
69                         return FIO_Q_COMPLETED;
70                 } else
71                         io_u->error = errno;
72         }
73
74         if (io_u->error)
75                 td_verror(td, io_u->error, "xfer");
76
77         return FIO_Q_COMPLETED;
78 }
79
80 static int fio_netio_setup_connect(struct thread_data *td, const char *host,
81                                    unsigned short port)
82 {
83         struct sockaddr_in addr;
84         struct fio_file *f;
85         int i;
86
87         memset(&addr, 0, sizeof(addr));
88         addr.sin_family = AF_INET;
89         addr.sin_port = htons(port);
90
91         if (inet_aton(host, &addr.sin_addr) != 1) {
92                 struct hostent *hent;
93
94                 hent = gethostbyname(host);
95                 if (!hent) {
96                         td_verror(td, errno, "gethostbyname");
97                         return 1;
98                 }
99
100                 memcpy(&addr.sin_addr, hent->h_addr, 4);
101         }
102
103         for_each_file(td, f, i) {
104                 f->fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
105                 if (f->fd < 0) {
106                         td_verror(td, errno, "socket");
107                         return 1;
108                 }
109
110                 if (connect(f->fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
111                         td_verror(td, errno, "connect");
112                         return 1;
113                 }
114         }
115
116         return 0;
117
118 }
119
120 static int fio_netio_accept_connections(struct thread_data *td, int fd,
121                                         struct sockaddr_in *addr)
122 {
123         socklen_t socklen = sizeof(*addr);
124         unsigned int accepts = 0;
125         struct pollfd pfd;
126
127         fprintf(f_out, "fio: waiting for %u connections\n", td->nr_files);
128
129         /*
130          * Accept loop. poll for incoming events, accept them. Repeat until we
131          * have all connections.
132          */
133         while (!td->terminate && accepts < td->nr_files) {
134                 struct fio_file *f;
135                 int ret, i;
136
137                 pfd.fd = fd;
138                 pfd.events = POLLIN;
139
140                 ret = poll(&pfd, 1, -1);
141                 if (ret < 0) {
142                         if (errno == EINTR)
143                                 continue;
144
145                         td_verror(td, errno, "poll");
146                         break;
147                 } else if (!ret)
148                         continue;
149
150                 /*
151                  * should be impossible
152                  */
153                 if (!(pfd.revents & POLLIN))
154                         continue;
155
156                 for_each_file(td, f, i) {
157                         if (f->fd != -1)
158                                 continue;
159
160                         f->fd = accept(fd, (struct sockaddr *) addr, &socklen);
161                         if (f->fd < 0) {
162                                 td_verror(td, errno, "accept");
163                                 return 1;
164                         }
165                         accepts++;
166                         break;
167                 }
168         }
169
170         return 0;
171 }
172
173 static int fio_netio_setup_listen(struct thread_data *td, unsigned short port)
174 {
175         struct sockaddr_in addr;
176         int fd, opt;
177
178         fd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
179         if (fd < 0) {
180                 td_verror(td, errno, "socket");
181                 return 1;
182         }
183
184         opt = 1;
185         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
186                 td_verror(td, errno, "setsockopt");
187                 return 1;
188         }
189 #ifdef SO_REUSEPORT
190         if (setsockopt(fd, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
191                 td_verror(td, errno, "setsockopt");
192                 return 1;
193         }
194 #endif
195
196         memset(&addr, 0, sizeof(addr));
197         addr.sin_family = AF_INET;
198         addr.sin_addr.s_addr = htonl(INADDR_ANY);
199         addr.sin_port = htons(port);
200
201         if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
202                 td_verror(td, errno, "bind");
203                 return 1;
204         }
205         if (listen(fd, 1) < 0) {
206                 td_verror(td, errno, "listen");
207                 return 1;
208         }
209
210         return fio_netio_accept_connections(td, fd, &addr);
211 }
212
213 static int fio_netio_init(struct thread_data *td)
214 {
215         char host[64], buf[128];
216         unsigned short port;
217         struct fio_file *f;
218         char *sep;
219         int ret, i;
220
221         if (!td->total_file_size) {
222                 log_err("fio: need size= set\n");
223                 return 1;
224         }
225
226         if (td_rw(td)) {
227                 log_err("fio: network connections must be read OR write\n");
228                 return 1;
229         }
230
231         strcpy(buf, td->filename);
232
233         sep = strchr(buf, ':');
234         if (!sep) {
235                 log_err("fio: bad network host:port <<%s>>\n", td->filename);
236                 return 1;
237         }
238
239         *sep = '\0';
240         sep++;
241         strcpy(host, buf);
242         port = atoi(sep);
243
244         if (td_read(td)) {
245                 send_to_net(td) = 0;
246                 ret = fio_netio_setup_listen(td, port);
247         } else {
248                 send_to_net(td) = 1;
249                 ret = fio_netio_setup_connect(td, host, port);
250         }
251
252         if (ret)
253                 return ret;
254
255         td->io_size = td->total_file_size;
256         td->total_io_size = td->io_size;
257
258         for_each_file(td, f, i) {
259                 f->file_size = td->total_file_size / td->nr_files;
260                 f->real_file_size = f->file_size;
261         }
262
263         td->nr_open_files = td->nr_files;
264         return 0;
265 }
266
267 static int fio_netio_setup(struct thread_data fio_unused *td)
268 {
269         return 0;
270 }
271
272 static struct ioengine_ops ioengine = {
273         .name           = "net",
274         .version        = FIO_IOOPS_VERSION,
275         .prep           = fio_netio_prep,
276         .queue          = fio_netio_queue,
277         .setup          = fio_netio_setup,
278         .init           = fio_netio_init,
279         .flags          = FIO_SYNCIO | FIO_DISKLESSIO | FIO_SELFOPEN,
280 };
281
282 static void fio_init fio_netio_register(void)
283 {
284         register_ioengine(&ioengine);
285 }
286
287 static void fio_exit fio_netio_unregister(void)
288 {
289         unregister_ioengine(&ioengine);
290 }