server: cleanup and proper error returns
[fio.git] / server.c
CommitLineData
50d16976
JA
1#include <stdio.h>
2#include <stdlib.h>
142575e6 3#include <stdarg.h>
50d16976
JA
4#include <unistd.h>
5#include <limits.h>
50d16976 6#include <errno.h>
50d16976 7#include <sys/poll.h>
50d16976
JA
8#include <sys/types.h>
9#include <sys/wait.h>
d05c4a03 10#include <sys/socket.h>
87aa8f19
JA
11#include <sys/stat.h>
12#include <sys/un.h>
1f51bba9 13#include <sys/uio.h>
50d16976
JA
14#include <netinet/in.h>
15#include <arpa/inet.h>
16#include <netdb.h>
e46d8091 17#include <syslog.h>
9e22ecb0 18#include <signal.h>
3989b143 19#ifdef CONFIG_ZLIB
1b42725f 20#include <zlib.h>
3989b143 21#endif
50d16976
JA
22
23#include "fio.h"
296ab20a 24#include "options.h"
132159a5 25#include "server.h"
fcee5ff6 26#include "crc/crc16.h"
c7c6cb4c 27#include "lib/ieee754.h"
ca09be4b
JA
28#include "verify.h"
29#include "smalloc.h"
89cf1480 30
5adc2447 31int fio_net_port = FIO_NET_PORT;
50d16976 32
009b1be4
JA
33int exit_backend = 0;
34
f77d4b73
JA
35enum {
36 SK_F_FREE = 1,
37 SK_F_COPY = 2,
38 SK_F_SIMPLE = 4,
02594e37 39 SK_F_VEC = 8,
f77d4b73
JA
40};
41
42struct sk_entry {
43 struct flist_head list;
44 int opcode;
45 void *buf;
46 off_t size;
47 uint64_t *tagptr;
48 int flags;
02594e37 49 struct flist_head next;
f77d4b73
JA
50};
51
52struct sk_out {
53 int sk;
54 struct fio_mutex *lock;
55 struct flist_head list;
56 struct fio_mutex *wait;
57};
58
87aa8f19
JA
59static char *fio_server_arg;
60static char *bind_sock;
61static struct sockaddr_in saddr_in;
811826be 62static struct sockaddr_in6 saddr_in6;
811826be 63static int use_ipv6;
3989b143
JA
64#ifdef CONFIG_ZLIB
65static unsigned int has_zlib = 1;
66#else
67static unsigned int has_zlib = 0;
68#endif
69static unsigned int use_zlib;
ca09be4b 70static char me[128];
37db14fe 71
f77d4b73
JA
72static pthread_key_t sk_out_key;
73
122c7725
JA
74struct fio_fork_item {
75 struct flist_head list;
76 int exitval;
77 int signal;
78 int exited;
79 pid_t pid;
80};
81
ca09be4b
JA
82struct cmd_reply {
83 struct fio_mutex lock;
84 void *data;
85 size_t size;
86 int error;
87};
88
89c1707c
JA
89static const char *fio_server_ops[FIO_NET_CMD_NR] = {
90 "",
91 "QUIT",
92 "EXIT",
93 "JOB",
94 "JOBLINE",
95 "TEXT",
96 "TS",
97 "GS",
98 "SEND_ETA",
99 "ETA",
100 "PROBE",
101 "START",
d09a64a0
JA
102 "STOP",
103 "DISK_UTIL",
b9d2f30a 104 "SERVER_START",
807f9971 105 "ADD_JOB",
ca09be4b
JA
106 "RUN",
107 "IOLOG",
108 "UPDATE_JOB",
109 "LOAD_FILE",
110 "VTRIGGER",
111 "SENDFILE",
89c1707c
JA
112};
113
f77d4b73
JA
114static void sk_lock(struct sk_out *sk_out)
115{
116 fio_mutex_down(sk_out->lock);
117}
118
119static void sk_unlock(struct sk_out *sk_out)
120{
121 fio_mutex_up(sk_out->lock);
122}
123
89c1707c
JA
124const char *fio_server_op(unsigned int op)
125{
126 static char buf[32];
127
128 if (op < FIO_NET_CMD_NR)
129 return fio_server_ops[op];
130
131 sprintf(buf, "UNKNOWN/%d", op);
132 return buf;
133}
134
5235f626 135static ssize_t iov_total_len(const struct iovec *iov, int count)
132159a5 136{
5235f626 137 ssize_t ret = 0;
794d69ca 138
5235f626
JA
139 while (count--) {
140 ret += iov->iov_len;
141 iov++;
142 }
143
144 return ret;
145}
132159a5 146
5235f626
JA
147static int fio_sendv_data(int sk, struct iovec *iov, int count)
148{
149 ssize_t total_len = iov_total_len(iov, count);
150 ssize_t ret;
151
152 do {
153 ret = writev(sk, iov, count);
132159a5 154 if (ret > 0) {
5235f626
JA
155 total_len -= ret;
156 if (!total_len)
132159a5 157 break;
5235f626
JA
158
159 while (ret) {
160 if (ret >= iov->iov_len) {
161 ret -= iov->iov_len;
162 iov++;
163 continue;
164 }
165 iov->iov_base += ret;
166 iov->iov_len -= ret;
167 ret = 0;
168 }
132159a5
JA
169 } else if (!ret)
170 break;
171 else if (errno == EAGAIN || errno == EINTR)
172 continue;
7b821684
JA
173 else
174 break;
132159a5
JA
175 } while (!exit_backend);
176
5235f626 177 if (!total_len)
132159a5
JA
178 return 0;
179
180 return 1;
181}
182
5235f626
JA
183int fio_send_data(int sk, const void *p, unsigned int len)
184{
185 struct iovec iov = { .iov_base = (void *) p, .iov_len = len };
186
187 assert(len <= sizeof(struct fio_net_cmd) + FIO_SERVER_MAX_FRAGMENT_PDU);
188
189 return fio_sendv_data(sk, &iov, 1);
190}
191
132159a5
JA
192int fio_recv_data(int sk, void *p, unsigned int len)
193{
194 do {
195 int ret = recv(sk, p, len, MSG_WAITALL);
196
197 if (ret > 0) {
198 len -= ret;
199 if (!len)
200 break;
201 p += ret;
202 continue;
203 } else if (!ret)
204 break;
205 else if (errno == EAGAIN || errno == EINTR)
206 continue;
7b821684
JA
207 else
208 break;
132159a5
JA
209 } while (!exit_backend);
210
211 if (!len)
212 return 0;
213
214 return -1;
215}
216
217static int verify_convert_cmd(struct fio_net_cmd *cmd)
218{
fcee5ff6 219 uint16_t crc;
132159a5 220
fcee5ff6
JA
221 cmd->cmd_crc16 = le16_to_cpu(cmd->cmd_crc16);
222 cmd->pdu_crc16 = le16_to_cpu(cmd->pdu_crc16);
132159a5 223
25dfa848 224 crc = fio_crc16(cmd, FIO_NET_CMD_CRC_SZ);
fcee5ff6 225 if (crc != cmd->cmd_crc16) {
132159a5 226 log_err("fio: server bad crc on command (got %x, wanted %x)\n",
fcee5ff6 227 cmd->cmd_crc16, crc);
132159a5
JA
228 return 1;
229 }
230
231 cmd->version = le16_to_cpu(cmd->version);
232 cmd->opcode = le16_to_cpu(cmd->opcode);
233 cmd->flags = le32_to_cpu(cmd->flags);
af9c9fb3 234 cmd->tag = le64_to_cpu(cmd->tag);
132159a5
JA
235 cmd->pdu_len = le32_to_cpu(cmd->pdu_len);
236
237 switch (cmd->version) {
fa2ea806 238 case FIO_SERVER_VER:
132159a5
JA
239 break;
240 default:
241 log_err("fio: bad server cmd version %d\n", cmd->version);
242 return 1;
243 }
244
b9d2f30a 245 if (cmd->pdu_len > FIO_SERVER_MAX_FRAGMENT_PDU) {
132159a5
JA
246 log_err("fio: command payload too large: %u\n", cmd->pdu_len);
247 return 1;
248 }
249
250 return 0;
251}
252
a64e88da
JA
253/*
254 * Read (and defragment, if necessary) incoming commands
255 */
e951bdc4 256struct fio_net_cmd *fio_net_recv_cmd(int sk)
132159a5 257{
dfbf1f6f 258 struct fio_net_cmd cmd, *tmp, *cmdret = NULL;
a64e88da 259 size_t cmd_size = 0, pdu_offset = 0;
fcee5ff6 260 uint16_t crc;
a64e88da
JA
261 int ret, first = 1;
262 void *pdu = NULL;
132159a5 263
a64e88da
JA
264 do {
265 ret = fio_recv_data(sk, &cmd, sizeof(cmd));
266 if (ret)
267 break;
132159a5 268
a64e88da
JA
269 /* We have a command, verify it and swap if need be */
270 ret = verify_convert_cmd(&cmd);
271 if (ret)
272 break;
132159a5 273
0b8f30a5
JA
274 if (first) {
275 /* if this is text, add room for \0 at the end */
276 cmd_size = sizeof(cmd) + cmd.pdu_len + 1;
277 assert(!cmdret);
278 } else
a64e88da 279 cmd_size += cmd.pdu_len;
132159a5 280
dfbf1f6f
JA
281 if (cmd_size / 1024 > FIO_SERVER_MAX_CMD_MB * 1024) {
282 log_err("fio: cmd+pdu too large (%llu)\n", (unsigned long long) cmd_size);
283 ret = 1;
284 break;
285 }
286
287 tmp = realloc(cmdret, cmd_size);
288 if (!tmp) {
289 log_err("fio: server failed allocating cmd\n");
290 ret = 1;
291 break;
292 }
293 cmdret = tmp;
132159a5 294
a64e88da
JA
295 if (first)
296 memcpy(cmdret, &cmd, sizeof(cmd));
67f15dcf
JA
297 else if (cmdret->opcode != cmd.opcode) {
298 log_err("fio: fragment opcode mismatch (%d != %d)\n",
299 cmdret->opcode, cmd.opcode);
300 ret = 1;
301 break;
302 }
a64e88da
JA
303
304 if (!cmd.pdu_len)
305 break;
306
307 /* There's payload, get it */
308 pdu = (void *) cmdret->payload + pdu_offset;
309 ret = fio_recv_data(sk, pdu, cmd.pdu_len);
310 if (ret)
311 break;
312
313 /* Verify payload crc */
25dfa848 314 crc = fio_crc16(pdu, cmd.pdu_len);
a64e88da
JA
315 if (crc != cmd.pdu_crc16) {
316 log_err("fio: server bad crc on payload ");
317 log_err("(got %x, wanted %x)\n", cmd.pdu_crc16, crc);
318 ret = 1;
319 break;
320 }
321
322 pdu_offset += cmd.pdu_len;
817f06bb
JA
323 if (!first)
324 cmdret->pdu_len += cmd.pdu_len;
a64e88da
JA
325 first = 0;
326 } while (cmd.flags & FIO_NET_CMD_F_MORE);
132159a5 327
a64e88da
JA
328 if (ret) {
329 free(cmdret);
330 cmdret = NULL;
0b8f30a5
JA
331 } else if (cmdret) {
332 /* zero-terminate text input */
084d1c6f
JA
333 if (cmdret->pdu_len) {
334 if (cmdret->opcode == FIO_NET_CMD_TEXT) {
8a1db9a1
JA
335 struct cmd_text_pdu *__pdu = (struct cmd_text_pdu *) cmdret->payload;
336 char *buf = (char *) __pdu->buf;
0b8f30a5 337
8a1db9a1 338 buf[__pdu->buf_len] = '\0';
084d1c6f 339 } else if (cmdret->opcode == FIO_NET_CMD_JOB) {
8a1db9a1
JA
340 struct cmd_job_pdu *__pdu = (struct cmd_job_pdu *) cmdret->payload;
341 char *buf = (char *) __pdu->buf;
342 int len = le32_to_cpu(__pdu->buf_len);
084d1c6f 343
46bcd498 344 buf[len] = '\0';
084d1c6f 345 }
0b8f30a5 346 }
084d1c6f 347
0b8f30a5 348 /* frag flag is internal */
a64e88da 349 cmdret->flags &= ~FIO_NET_CMD_F_MORE;
0b8f30a5 350 }
a64e88da
JA
351
352 return cmdret;
132159a5
JA
353}
354
40c60516
JA
355static void add_reply(uint64_t tag, struct flist_head *list)
356{
a572bbfb 357 struct fio_net_cmd_reply *reply;
40c60516 358
a572bbfb 359 reply = (struct fio_net_cmd_reply *) (uintptr_t) tag;
40c60516
JA
360 flist_add_tail(&reply->list, list);
361}
362
363static uint64_t alloc_reply(uint64_t tag, uint16_t opcode)
364{
365 struct fio_net_cmd_reply *reply;
366
367 reply = calloc(1, sizeof(*reply));
368 INIT_FLIST_HEAD(&reply->list);
c7b09783 369 fio_gettime(&reply->tv, NULL);
40c60516
JA
370 reply->saved_tag = tag;
371 reply->opcode = opcode;
372
373 return (uintptr_t) reply;
374}
375
376static void free_reply(uint64_t tag)
377{
a572bbfb 378 struct fio_net_cmd_reply *reply;
40c60516 379
a572bbfb 380 reply = (struct fio_net_cmd_reply *) (uintptr_t) tag;
40c60516
JA
381 free(reply);
382}
383
53bd8dbc 384void fio_net_cmd_crc_pdu(struct fio_net_cmd *cmd, const void *pdu)
132159a5
JA
385{
386 uint32_t pdu_len;
387
25dfa848 388 cmd->cmd_crc16 = __cpu_to_le16(fio_crc16(cmd, FIO_NET_CMD_CRC_SZ));
132159a5
JA
389
390 pdu_len = le32_to_cpu(cmd->pdu_len);
1b42725f
JA
391 cmd->pdu_crc16 = __cpu_to_le16(fio_crc16(pdu, pdu_len));
392}
393
394void fio_net_cmd_crc(struct fio_net_cmd *cmd)
395{
396 fio_net_cmd_crc_pdu(cmd, cmd->payload);
132159a5
JA
397}
398
af9c9fb3 399int fio_net_send_cmd(int fd, uint16_t opcode, const void *buf, off_t size,
40c60516 400 uint64_t *tagptr, struct flist_head *list)
794d69ca 401{
7f868316
JA
402 struct fio_net_cmd *cmd = NULL;
403 size_t this_len, cur_len = 0;
40c60516 404 uint64_t tag;
794d69ca
JA
405 int ret;
406
40c60516
JA
407 if (list) {
408 assert(tagptr);
409 tag = *tagptr = alloc_reply(*tagptr, opcode);
410 } else
411 tag = tagptr ? *tagptr : 0;
412
794d69ca
JA
413 do {
414 this_len = size;
b9d2f30a
JA
415 if (this_len > FIO_SERVER_MAX_FRAGMENT_PDU)
416 this_len = FIO_SERVER_MAX_FRAGMENT_PDU;
794d69ca 417
7f868316
JA
418 if (!cmd || cur_len < sizeof(*cmd) + this_len) {
419 if (cmd)
420 free(cmd);
421
422 cur_len = sizeof(*cmd) + this_len;
423 cmd = malloc(cur_len);
424 }
794d69ca 425
af9c9fb3 426 fio_init_net_cmd(cmd, opcode, buf, this_len, tag);
794d69ca
JA
427
428 if (this_len < size)
ddcc0b69 429 cmd->flags = __cpu_to_le32(FIO_NET_CMD_F_MORE);
794d69ca
JA
430
431 fio_net_cmd_crc(cmd);
432
433 ret = fio_send_data(fd, cmd, sizeof(*cmd) + this_len);
794d69ca
JA
434 size -= this_len;
435 buf += this_len;
436 } while (!ret && size);
437
40c60516
JA
438 if (list) {
439 if (ret)
440 free_reply(tag);
441 else
442 add_reply(tag, list);
443 }
444
7f868316
JA
445 if (cmd)
446 free(cmd);
447
794d69ca
JA
448 return ret;
449}
450
02594e37
JA
451struct sk_entry *fio_net_prep_cmd(uint16_t opcode, void *buf, off_t size,
452 uint64_t *tagptr, int flags)
f77d4b73 453{
f77d4b73
JA
454 struct sk_entry *entry;
455
456 entry = smalloc(sizeof(*entry));
02594e37 457 INIT_FLIST_HEAD(&entry->next);
f77d4b73
JA
458 entry->opcode = opcode;
459 if (flags & SK_F_COPY) {
460 entry->buf = smalloc(size);
461 memcpy(entry->buf, buf, size);
462 } else
463 entry->buf = buf;
464 entry->size = size;
465 entry->tagptr = tagptr;
466 entry->flags = flags;
467
02594e37
JA
468 return entry;
469}
470
471static void fio_net_queue_entry(struct sk_entry *entry)
472{
473 struct sk_out *sk_out = pthread_getspecific(sk_out_key);
474
f77d4b73
JA
475 sk_lock(sk_out);
476 flist_add_tail(&entry->list, &sk_out->list);
477 sk_unlock(sk_out);
478
479 fio_mutex_up(sk_out->wait);
02594e37 480}
f77d4b73 481
02594e37
JA
482static int fio_net_queue_cmd(uint16_t opcode, void *buf, off_t size,
483 uint64_t *tagptr, int flags)
484{
485 struct sk_entry *entry;
486
487 entry = fio_net_prep_cmd(opcode, buf, size, tagptr, flags);
488 fio_net_queue_entry(entry);
f77d4b73
JA
489 return 0;
490}
491
89c1707c 492static int fio_net_send_simple_stack_cmd(int sk, uint16_t opcode, uint64_t tag)
132159a5 493{
178cde9f 494 struct fio_net_cmd cmd;
132159a5 495
af9c9fb3 496 fio_init_net_cmd(&cmd, opcode, NULL, 0, tag);
132159a5
JA
497 fio_net_cmd_crc(&cmd);
498
499 return fio_send_data(sk, &cmd, sizeof(cmd));
500}
501
89c1707c
JA
502/*
503 * If 'list' is non-NULL, then allocate and store the sent command for
504 * later verification.
505 */
506int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t tag,
507 struct flist_head *list)
508{
89c1707c
JA
509 int ret;
510
40c60516
JA
511 if (list)
512 tag = alloc_reply(tag, opcode);
89c1707c 513
40c60516 514 ret = fio_net_send_simple_stack_cmd(sk, opcode, tag);
89c1707c 515 if (ret) {
40c60516
JA
516 if (list)
517 free_reply(tag);
518
89c1707c
JA
519 return ret;
520 }
521
40c60516
JA
522 if (list)
523 add_reply(tag, list);
524
89c1707c
JA
525 return 0;
526}
527
f77d4b73
JA
528static int fio_net_queue_quit(void)
529{
530 dprint(FD_NET, "server: sending quit\n");
531
532 return fio_net_queue_cmd(FIO_NET_CMD_QUIT, NULL, 0, 0, SK_F_SIMPLE);
533}
534
122c7725 535int fio_net_send_quit(int sk)
437377e1 536{
46c48f1f 537 dprint(FD_NET, "server: sending quit\n");
122c7725 538
8c95307b 539 return fio_net_send_simple_cmd(sk, FIO_NET_CMD_QUIT, 0, NULL);
437377e1
JA
540}
541
f77d4b73 542static int fio_net_send_ack(struct fio_net_cmd *cmd, int error, int signal)
132159a5 543{
11e950bd 544 struct cmd_end_pdu epdu;
f58bd2a4 545 uint64_t tag = 0;
122c7725 546
f58bd2a4
JA
547 if (cmd)
548 tag = cmd->tag;
122c7725
JA
549
550 epdu.error = __cpu_to_le32(error);
551 epdu.signal = __cpu_to_le32(signal);
f77d4b73 552 return fio_net_queue_cmd(FIO_NET_CMD_STOP, &epdu, sizeof(epdu), &tag, SK_F_COPY);
f58bd2a4
JA
553}
554
f77d4b73 555static int fio_net_queue_stop(int error, int signal)
f58bd2a4
JA
556{
557 dprint(FD_NET, "server: sending stop (%d, %d)\n", error, signal);
f77d4b73 558 return fio_net_send_ack(NULL, error, signal);
122c7725
JA
559}
560
561static void fio_server_add_fork_item(pid_t pid, struct flist_head *list)
562{
563 struct fio_fork_item *ffi;
564
565 ffi = malloc(sizeof(*ffi));
566 ffi->exitval = 0;
567 ffi->signal = 0;
568 ffi->exited = 0;
569 ffi->pid = pid;
570 flist_add_tail(&ffi->list, list);
571}
572
6348b5de 573static void fio_server_add_conn_pid(struct flist_head *conn_list, pid_t pid)
122c7725 574{
a7533dbd 575 dprint(FD_NET, "server: forked off connection job (pid=%u)\n", (int) pid);
6348b5de 576 fio_server_add_fork_item(pid, conn_list);
122c7725
JA
577}
578
6348b5de 579static void fio_server_add_job_pid(struct flist_head *job_list, pid_t pid)
122c7725 580{
a7533dbd 581 dprint(FD_NET, "server: forked off job job (pid=%u)\n", (int) pid);
6348b5de 582 fio_server_add_fork_item(pid, job_list);
122c7725
JA
583}
584
585static void fio_server_check_fork_item(struct fio_fork_item *ffi)
586{
587 int ret, status;
588
589 ret = waitpid(ffi->pid, &status, WNOHANG);
590 if (ret < 0) {
591 if (errno == ECHILD) {
a7533dbd 592 log_err("fio: connection pid %u disappeared\n", (int) ffi->pid);
122c7725
JA
593 ffi->exited = 1;
594 } else
595 log_err("fio: waitpid: %s\n", strerror(errno));
596 } else if (ret == ffi->pid) {
597 if (WIFSIGNALED(status)) {
598 ffi->signal = WTERMSIG(status);
599 ffi->exited = 1;
600 }
601 if (WIFEXITED(status)) {
602 if (WEXITSTATUS(status))
603 ffi->exitval = WEXITSTATUS(status);
604 ffi->exited = 1;
605 }
606 }
607}
608
f77d4b73 609static void fio_server_fork_item_done(struct fio_fork_item *ffi, bool stop)
122c7725 610{
a7533dbd 611 dprint(FD_NET, "pid %u exited, sig=%u, exitval=%d\n", (int) ffi->pid, ffi->signal, ffi->exitval);
122c7725
JA
612
613 /*
614 * Fold STOP and QUIT...
615 */
f77d4b73
JA
616 if (stop) {
617 fio_net_queue_stop(ffi->exitval, ffi->signal);
618 fio_net_queue_quit();
619 }
620
122c7725
JA
621 flist_del(&ffi->list);
622 free(ffi);
623}
624
f77d4b73 625static void fio_server_check_fork_items(struct flist_head *list, bool stop)
122c7725
JA
626{
627 struct flist_head *entry, *tmp;
628 struct fio_fork_item *ffi;
629
630 flist_for_each_safe(entry, tmp, list) {
631 ffi = flist_entry(entry, struct fio_fork_item, list);
632
633 fio_server_check_fork_item(ffi);
634
635 if (ffi->exited)
f77d4b73 636 fio_server_fork_item_done(ffi, stop);
122c7725
JA
637 }
638}
639
6348b5de 640static void fio_server_check_jobs(struct flist_head *job_list)
122c7725 641{
f77d4b73 642 fio_server_check_fork_items(job_list, true);
122c7725
JA
643}
644
6348b5de 645static void fio_server_check_conns(struct flist_head *conn_list)
122c7725 646{
f77d4b73 647 fio_server_check_fork_items(conn_list, false);
122c7725
JA
648}
649
323255cc
JA
650static int handle_load_file_cmd(struct fio_net_cmd *cmd)
651{
652 struct cmd_load_file_pdu *pdu = (struct cmd_load_file_pdu *) cmd->payload;
653 void *file_name = pdu->file;
654 struct cmd_start_pdu spdu;
655
656 dprint(FD_NET, "server: loading local file %s\n", (char *) file_name);
657
658 pdu->name_len = le16_to_cpu(pdu->name_len);
659 pdu->client_type = le16_to_cpu(pdu->client_type);
660
661 if (parse_jobs_ini(file_name, 0, 0, pdu->client_type)) {
f77d4b73 662 fio_net_queue_quit();
323255cc
JA
663 return -1;
664 }
665
666 spdu.jobs = cpu_to_le32(thread_number);
667 spdu.stat_outputs = cpu_to_le32(stat_number);
f77d4b73 668 fio_net_queue_cmd(FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, SK_F_COPY);
323255cc
JA
669 return 0;
670}
671
f77d4b73
JA
672static int handle_run_cmd(struct sk_out *sk_out, struct flist_head *job_list,
673 struct fio_net_cmd *cmd)
122c7725 674{
f77d4b73 675 struct backend_data data;
122c7725 676 pid_t pid;
a64e88da 677 int ret;
132159a5 678
cbb1303c 679 fio_time_init();
122c7725 680 set_genesis_time();
11e950bd 681
122c7725
JA
682 pid = fork();
683 if (pid) {
6348b5de 684 fio_server_add_job_pid(job_list, pid);
122c7725
JA
685 return 0;
686 }
11e950bd 687
f77d4b73
JA
688 data.key = sk_out_key;
689 data.ptr = sk_out;
690 //pthread_setspecific(sk_out_key, sk_out);
691 ret = fio_backend(&data);
2bb3f0a7 692 free_threads_shm();
122c7725 693 _exit(ret);
81179eec
JA
694}
695
b9d2f30a
JA
696static int handle_job_cmd(struct fio_net_cmd *cmd)
697{
46bcd498
JA
698 struct cmd_job_pdu *pdu = (struct cmd_job_pdu *) cmd->payload;
699 void *buf = pdu->buf;
b9d2f30a
JA
700 struct cmd_start_pdu spdu;
701
46bcd498
JA
702 pdu->buf_len = le32_to_cpu(pdu->buf_len);
703 pdu->client_type = le32_to_cpu(pdu->client_type);
704
705 if (parse_jobs_ini(buf, 1, 0, pdu->client_type)) {
f77d4b73 706 fio_net_queue_quit();
b9d2f30a
JA
707 return -1;
708 }
709
710 spdu.jobs = cpu_to_le32(thread_number);
108fea77 711 spdu.stat_outputs = cpu_to_le32(stat_number);
f77d4b73
JA
712
713 fio_net_queue_cmd(FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, SK_F_COPY);
b9d2f30a
JA
714 return 0;
715}
716
81179eec
JA
717static int handle_jobline_cmd(struct fio_net_cmd *cmd)
718{
fa2ea806
JA
719 void *pdu = cmd->payload;
720 struct cmd_single_line_pdu *cslp;
721 struct cmd_line_pdu *clp;
722 unsigned long offset;
b9d2f30a 723 struct cmd_start_pdu spdu;
fa2ea806 724 char **argv;
b9d2f30a 725 int i;
81179eec 726
fa2ea806
JA
727 clp = pdu;
728 clp->lines = le16_to_cpu(clp->lines);
46bcd498 729 clp->client_type = le16_to_cpu(clp->client_type);
fa2ea806
JA
730 argv = malloc(clp->lines * sizeof(char *));
731 offset = sizeof(*clp);
81179eec 732
fa2ea806 733 dprint(FD_NET, "server: %d command line args\n", clp->lines);
39e8e016 734
fa2ea806
JA
735 for (i = 0; i < clp->lines; i++) {
736 cslp = pdu + offset;
737 argv[i] = (char *) cslp->text;
738
739 offset += sizeof(*cslp) + le16_to_cpu(cslp->len);
39e8e016
JA
740 dprint(FD_NET, "server: %d: %s\n", i, argv[i]);
741 }
81179eec 742
46bcd498 743 if (parse_cmd_line(clp->lines, argv, clp->client_type)) {
f77d4b73 744 fio_net_queue_quit();
fa2ea806 745 free(argv);
81179eec 746 return -1;
e6d1c668 747 }
81179eec 748
fa2ea806
JA
749 free(argv);
750
b9d2f30a 751 spdu.jobs = cpu_to_le32(thread_number);
1e5324e7 752 spdu.stat_outputs = cpu_to_le32(stat_number);
f77d4b73
JA
753
754 fio_net_queue_cmd(FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, SK_F_COPY);
b9d2f30a 755 return 0;
132159a5
JA
756}
757
c28e8e8c
JA
758static int handle_probe_cmd(struct fio_net_cmd *cmd)
759{
3989b143
JA
760 struct cmd_client_probe_pdu *pdu = (struct cmd_client_probe_pdu *) cmd->payload;
761 struct cmd_probe_reply_pdu probe;
40c60516 762 uint64_t tag = cmd->tag;
c28e8e8c 763
89c1707c
JA
764 dprint(FD_NET, "server: sending probe reply\n");
765
ca09be4b
JA
766 strcpy(me, (char *) pdu->server);
767
c28e8e8c
JA
768 memset(&probe, 0, sizeof(probe));
769 gethostname((char *) probe.hostname, sizeof(probe.hostname));
0dcebdf4 770#ifdef CONFIG_BIG_ENDIAN
6eb24791
JA
771 probe.bigendian = 1;
772#endif
750db473 773 strncpy((char *) probe.fio_version, fio_version_string, sizeof(probe.fio_version));
c28e8e8c 774
cca84643
JA
775 probe.os = FIO_OS;
776 probe.arch = FIO_ARCH;
38fdef22 777 probe.bpp = sizeof(void *);
d31e26d0 778 probe.cpus = __cpu_to_le32(cpus_online());
3989b143
JA
779
780 /*
781 * If the client supports compression and we do too, then enable it
782 */
783 if (has_zlib && le64_to_cpu(pdu->flags) & FIO_PROBE_FLAG_ZLIB) {
784 probe.flags = __cpu_to_le64(FIO_PROBE_FLAG_ZLIB);
785 use_zlib = 1;
786 } else {
787 probe.flags = 0;
788 use_zlib = 0;
789 }
38fdef22 790
f77d4b73 791 return fio_net_queue_cmd(FIO_NET_CMD_PROBE, &probe, sizeof(probe), &tag, SK_F_COPY);
af9c9fb3
JA
792}
793
794static int handle_send_eta_cmd(struct fio_net_cmd *cmd)
795{
796 struct jobs_eta *je;
40c60516 797 uint64_t tag = cmd->tag;
723297c9 798 size_t size;
af9c9fb3
JA
799 int i;
800
e123c833
JA
801 dprint(FD_NET, "server sending status\n");
802
803 /*
804 * Fake ETA return if we don't have a local one, otherwise the client
805 * will end up timing out waiting for a response to the ETA request
806 */
c5103619 807 je = get_jobs_eta(true, &size);
b2bee93a
JA
808 if (!je) {
809 size = sizeof(*je);
810 je = calloc(1, size);
811 } else {
e123c833
JA
812 je->nr_running = cpu_to_le32(je->nr_running);
813 je->nr_ramp = cpu_to_le32(je->nr_ramp);
814 je->nr_pending = cpu_to_le32(je->nr_pending);
815 je->nr_setting_up = cpu_to_le32(je->nr_setting_up);
816 je->files_open = cpu_to_le32(je->files_open);
817
818 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
819 je->m_rate[i] = cpu_to_le32(je->m_rate[i]);
820 je->t_rate[i] = cpu_to_le32(je->t_rate[i]);
821 je->m_iops[i] = cpu_to_le32(je->m_iops[i]);
822 je->t_iops[i] = cpu_to_le32(je->t_iops[i]);
823 je->rate[i] = cpu_to_le32(je->rate[i]);
824 je->iops[i] = cpu_to_le32(je->iops[i]);
825 }
af9c9fb3 826
e123c833
JA
827 je->elapsed_sec = cpu_to_le64(je->elapsed_sec);
828 je->eta_sec = cpu_to_le64(je->eta_sec);
829 je->nr_threads = cpu_to_le32(je->nr_threads);
830 je->is_pow2 = cpu_to_le32(je->is_pow2);
831 je->unit_base = cpu_to_le32(je->unit_base);
af9c9fb3
JA
832 }
833
f77d4b73 834 fio_net_queue_cmd(FIO_NET_CMD_ETA, je, size, &tag, SK_F_FREE);
af9c9fb3 835 return 0;
c28e8e8c
JA
836}
837
f77d4b73 838static int send_update_job_reply(uint64_t __tag, int error)
40c60516
JA
839{
840 uint64_t tag = __tag;
841 uint32_t pdu_error;
842
843 pdu_error = __cpu_to_le32(error);
f77d4b73 844 return fio_net_queue_cmd(FIO_NET_CMD_UPDATE_JOB, &pdu_error, sizeof(pdu_error), &tag, SK_F_COPY);
40c60516
JA
845}
846
f58bd2a4
JA
847static int handle_update_job_cmd(struct fio_net_cmd *cmd)
848{
849 struct cmd_add_job_pdu *pdu = (struct cmd_add_job_pdu *) cmd->payload;
850 struct thread_data *td;
851 uint32_t tnumber;
852
853 tnumber = le32_to_cpu(pdu->thread_number);
854
855 dprint(FD_NET, "server: updating options for job %u\n", tnumber);
856
30ffacbf 857 if (!tnumber || tnumber > thread_number) {
f77d4b73 858 send_update_job_reply(cmd->tag, ENODEV);
f58bd2a4
JA
859 return 0;
860 }
861
30ffacbf 862 td = &threads[tnumber - 1];
f58bd2a4 863 convert_thread_options_to_cpu(&td->o, &pdu->top);
f77d4b73 864 send_update_job_reply(cmd->tag, 0);
f58bd2a4
JA
865 return 0;
866}
867
ca09be4b
JA
868static int handle_trigger_cmd(struct fio_net_cmd *cmd)
869{
870 struct cmd_vtrigger_pdu *pdu = (struct cmd_vtrigger_pdu *) cmd->payload;
871 char *buf = (char *) pdu->cmd;
872 struct all_io_list *rep;
873 size_t sz;
874
875 pdu->len = le16_to_cpu(pdu->len);
876 buf[pdu->len] = '\0';
877
878 rep = get_all_io_list(IO_LIST_ALL, &sz);
879 if (!rep) {
880 struct all_io_list state;
881
882 state.threads = cpu_to_le64((uint64_t) 0);
f77d4b73
JA
883 fio_net_queue_cmd(FIO_NET_CMD_VTRIGGER, &state, sizeof(state), NULL, SK_F_COPY);
884 } else
885 fio_net_queue_cmd(FIO_NET_CMD_VTRIGGER, rep, sz, NULL, SK_F_FREE);
ca09be4b
JA
886
887 exec_trigger(buf);
888 return 0;
889}
890
f77d4b73
JA
891static int handle_command(struct sk_out *sk_out, struct flist_head *job_list,
892 struct fio_net_cmd *cmd)
132159a5
JA
893{
894 int ret;
895
4b91ee8f
JA
896 dprint(FD_NET, "server: got op [%s], pdu=%u, tag=%llx\n",
897 fio_server_op(cmd->opcode), cmd->pdu_len,
898 (unsigned long long) cmd->tag);
46c48f1f 899
132159a5
JA
900 switch (cmd->opcode) {
901 case FIO_NET_CMD_QUIT:
cc0df00a 902 fio_terminate_threads(TERMINATE_ALL);
f77d4b73
JA
903 ret = 0;
904 break;
d7959186 905 case FIO_NET_CMD_EXIT:
132159a5 906 exit_backend = 1;
c28e8e8c 907 return -1;
323255cc
JA
908 case FIO_NET_CMD_LOAD_FILE:
909 ret = handle_load_file_cmd(cmd);
910 break;
132159a5 911 case FIO_NET_CMD_JOB:
0b8f30a5 912 ret = handle_job_cmd(cmd);
132159a5 913 break;
81179eec
JA
914 case FIO_NET_CMD_JOBLINE:
915 ret = handle_jobline_cmd(cmd);
916 break;
c28e8e8c
JA
917 case FIO_NET_CMD_PROBE:
918 ret = handle_probe_cmd(cmd);
919 break;
af9c9fb3
JA
920 case FIO_NET_CMD_SEND_ETA:
921 ret = handle_send_eta_cmd(cmd);
922 break;
b9d2f30a 923 case FIO_NET_CMD_RUN:
f77d4b73 924 ret = handle_run_cmd(sk_out, job_list, cmd);
b9d2f30a 925 break;
f58bd2a4
JA
926 case FIO_NET_CMD_UPDATE_JOB:
927 ret = handle_update_job_cmd(cmd);
928 break;
ca09be4b
JA
929 case FIO_NET_CMD_VTRIGGER:
930 ret = handle_trigger_cmd(cmd);
931 break;
932 case FIO_NET_CMD_SENDFILE: {
933 struct cmd_sendfile_reply *in;
934 struct cmd_reply *rep;
935
936 rep = (struct cmd_reply *) (uintptr_t) cmd->tag;
937
938 in = (struct cmd_sendfile_reply *) cmd->payload;
939 in->size = le32_to_cpu(in->size);
940 in->error = le32_to_cpu(in->error);
941 if (in->error) {
942 ret = 1;
943 rep->error = in->error;
944 } else {
945 ret = 0;
946 rep->data = smalloc(in->size);
947 if (!rep->data) {
948 ret = 1;
949 rep->error = ENOMEM;
950 } else {
951 rep->size = in->size;
952 memcpy(rep->data, in->data, in->size);
953 }
954 }
955 fio_mutex_up(&rep->lock);
956 break;
957 }
132159a5 958 default:
3c3ed070 959 log_err("fio: unknown opcode: %s\n", fio_server_op(cmd->opcode));
132159a5
JA
960 ret = 1;
961 }
962
963 return ret;
964}
965
02594e37
JA
966/*
967 * Send a command with a separate PDU, not inlined in the command
968 */
969static int fio_send_cmd_ext_pdu(int sk, uint16_t opcode, const void *buf,
970 off_t size, uint64_t tag, uint32_t flags)
971{
972 struct fio_net_cmd cmd;
973 struct iovec iov[2];
974
975 iov[0].iov_base = (void *) &cmd;
976 iov[0].iov_len = sizeof(cmd);
977 iov[1].iov_base = (void *) buf;
978 iov[1].iov_len = size;
979
980 __fio_init_net_cmd(&cmd, opcode, size, tag);
981 cmd.flags = __cpu_to_le32(flags);
982 fio_net_cmd_crc_pdu(&cmd, buf);
983
984 return fio_sendv_data(sk, iov, 2);
985}
986
987static void finish_entry(struct sk_entry *entry)
988{
989 if (entry->flags & SK_F_FREE)
990 free(entry->buf);
991 else if (entry->flags & SK_F_COPY)
992 sfree(entry->buf);
993
994 sfree(entry);
995}
996
b3882320
JA
997static void entry_set_flags_tag(struct sk_entry *entry, struct flist_head *list,
998 unsigned int *flags, uint64_t *tag)
02594e37 999{
b3882320
JA
1000 if (!flist_empty(list))
1001 *flags = FIO_NET_CMD_F_MORE;
02594e37 1002 else
b3882320 1003 *flags = 0;
02594e37 1004
b3882320
JA
1005 if (entry->tagptr)
1006 *tag = *entry->tagptr;
02594e37 1007 else
b3882320
JA
1008 *tag = 0;
1009}
02594e37 1010
b3882320
JA
1011static int send_vec_entry(struct sk_out *sk_out, struct sk_entry *first)
1012{
1013 unsigned int flags;
1014 uint64_t tag;
1015 int ret;
1016
1017 entry_set_flags_tag(first, &first->next, &flags, &tag);
1018
1019 ret = fio_send_cmd_ext_pdu(sk_out->sk, first->opcode, first->buf, first->size, tag, flags);
02594e37
JA
1020
1021 while (!flist_empty(&first->next)) {
1022 struct sk_entry *next;
1023
1024 next = flist_first_entry(&first->next, struct sk_entry, list);
1025 flist_del_init(&next->list);
02594e37 1026
b3882320 1027 entry_set_flags_tag(next, &first->next, &flags, &tag);
02594e37 1028
b3882320 1029 ret += fio_send_cmd_ext_pdu(sk_out->sk, next->opcode, next->buf, next->size, tag, flags);
02594e37
JA
1030 finish_entry(next);
1031 }
b3882320
JA
1032
1033 return ret;
02594e37
JA
1034}
1035
b3882320 1036static int handle_sk_entry(struct sk_out *sk_out, struct sk_entry *entry)
f77d4b73 1037{
b3882320
JA
1038 int ret;
1039
02594e37 1040 if (entry->flags & SK_F_VEC)
b3882320 1041 ret = send_vec_entry(sk_out, entry);
f77d4b73
JA
1042 if (entry->flags & SK_F_SIMPLE) {
1043 uint64_t tag = 0;
1044
1045 if (entry->tagptr)
1046 tag = *entry->tagptr;
1047
b3882320 1048 ret = fio_net_send_simple_cmd(sk_out->sk, entry->opcode, tag, NULL);
f77d4b73 1049 } else
b3882320
JA
1050 ret = fio_net_send_cmd(sk_out->sk, entry->opcode, entry->buf, entry->size, entry->tagptr, NULL);
1051
1052 if (ret)
1053 log_err("fio: failed handling cmd %s\n", fio_server_op(entry->opcode));
f77d4b73 1054
02594e37 1055 finish_entry(entry);
b3882320 1056 return ret;
f77d4b73
JA
1057}
1058
b3882320 1059static int handle_xmits(struct sk_out *sk_out)
f77d4b73
JA
1060{
1061 struct sk_entry *entry;
1062 FLIST_HEAD(list);
b3882320 1063 int ret = 0;
f77d4b73
JA
1064
1065 sk_lock(sk_out);
1066 if (flist_empty(&sk_out->list)) {
1067 sk_unlock(sk_out);
b3882320 1068 return 0;
f77d4b73
JA
1069 }
1070
1071 flist_splice_init(&sk_out->list, &list);
1072 sk_unlock(sk_out);
1073
1074 while (!flist_empty(&list)) {
1075 entry = flist_entry(list.next, struct sk_entry, list);
1076 flist_del(&entry->list);
b3882320 1077 ret += handle_sk_entry(sk_out, entry);
f77d4b73 1078 }
b3882320
JA
1079
1080 return ret;
f77d4b73
JA
1081}
1082
1083static int handle_connection(struct sk_out *sk_out)
132159a5
JA
1084{
1085 struct fio_net_cmd *cmd = NULL;
6348b5de 1086 FLIST_HEAD(job_list);
132159a5
JA
1087 int ret = 0;
1088
122c7725 1089 reset_fio_state();
122c7725 1090
132159a5
JA
1091 /* read forever */
1092 while (!exit_backend) {
e951bdc4 1093 struct pollfd pfd = {
f77d4b73 1094 .fd = sk_out->sk,
e951bdc4
JA
1095 .events = POLLIN,
1096 };
1097
1098 ret = 0;
1099 do {
54163255
JA
1100 int timeout = 1000;
1101
1102 if (!flist_empty(&job_list))
1103 timeout = 100;
3c3ed070 1104
f77d4b73
JA
1105 handle_xmits(sk_out);
1106
1107 ret = poll(&pfd, 1, 0);
e951bdc4
JA
1108 if (ret < 0) {
1109 if (errno == EINTR)
1110 break;
1111 log_err("fio: poll: %s\n", strerror(errno));
1112 break;
19c65179 1113 } else if (!ret) {
6348b5de 1114 fio_server_check_jobs(&job_list);
f77d4b73 1115 fio_mutex_down_timeout(sk_out->wait, timeout);
e951bdc4 1116 continue;
19c65179 1117 }
e951bdc4
JA
1118
1119 if (pfd.revents & POLLIN)
1120 break;
1121 if (pfd.revents & (POLLERR|POLLHUP)) {
1122 ret = 1;
1123 break;
1124 }
19c65179 1125 } while (!exit_backend);
e951bdc4 1126
6348b5de 1127 fio_server_check_jobs(&job_list);
122c7725 1128
e951bdc4
JA
1129 if (ret < 0)
1130 break;
1131
f77d4b73 1132 cmd = fio_net_recv_cmd(sk_out->sk);
132159a5 1133 if (!cmd) {
c28e8e8c 1134 ret = -1;
132159a5
JA
1135 break;
1136 }
1137
f77d4b73 1138 ret = handle_command(sk_out, &job_list, cmd);
132159a5
JA
1139 if (ret)
1140 break;
1141
1142 free(cmd);
c77a99e7 1143 cmd = NULL;
132159a5
JA
1144 }
1145
1146 if (cmd)
1147 free(cmd);
1148
f77d4b73
JA
1149 handle_xmits(sk_out);
1150
1151 close(sk_out->sk);
122c7725 1152 _exit(ret);
cc0df00a
JA
1153}
1154
296ab20a
BE
1155/* get the address on this host bound by the input socket,
1156 * whether it is ipv6 or ipv4 */
1157
f77d4b73 1158int get_my_addr_str(int sk)
296ab20a 1159{
f77d4b73
JA
1160 struct sockaddr_in6 myaddr6 = { 0, };
1161 struct sockaddr_in myaddr4 = { 0, };
1162 struct sockaddr *sockaddr_p;
1163 char *net_addr;
1164 socklen_t len;
1165 int ret;
296ab20a 1166
f77d4b73
JA
1167 if (use_ipv6) {
1168 len = sizeof(myaddr6);
296ab20a 1169 sockaddr_p = (struct sockaddr * )&myaddr6;
f77d4b73
JA
1170 net_addr = (char * )&myaddr6.sin6_addr;
1171 } else {
1172 len = sizeof(myaddr4);
296ab20a 1173 sockaddr_p = (struct sockaddr * )&myaddr4;
f77d4b73
JA
1174 net_addr = (char * )&myaddr4.sin_addr;
1175 }
1176
296ab20a
BE
1177 ret = getsockname(sk, sockaddr_p, &len);
1178 if (ret) {
1179 log_err("fio: getsockaddr: %s\n", strerror(errno));
296ab20a
BE
1180 return -1;
1181 }
f77d4b73
JA
1182
1183 if (!inet_ntop(use_ipv6?AF_INET6:AF_INET, net_addr, client_sockaddr_str, INET6_ADDRSTRLEN - 1)) {
296ab20a 1184 log_err("inet_ntop: failed to convert addr to string\n");
296ab20a
BE
1185 return -1;
1186 }
f77d4b73 1187
296ab20a
BE
1188 dprint(FD_NET, "fio server bound to addr %s\n", client_sockaddr_str);
1189 return 0;
1190}
1191
50d16976
JA
1192static int accept_loop(int listen_sk)
1193{
bb447a27 1194 struct sockaddr_in addr;
479471c4
JA
1195 struct sockaddr_in6 addr6;
1196 socklen_t len = use_ipv6 ? sizeof(addr6) : sizeof(addr);
009b1be4 1197 struct pollfd pfd;
4a851614 1198 int ret = 0, sk, exitval = 0;
f77d4b73 1199 struct sk_out *sk_out;
6348b5de 1200 FLIST_HEAD(conn_list);
50d16976 1201
60efd14e
JA
1202 dprint(FD_NET, "server enter accept loop\n");
1203
4a851614 1204 fio_set_fd_nonblocking(listen_sk, "server");
009b1be4 1205
f77d4b73
JA
1206 sk_out = smalloc(sizeof(*sk_out));
1207 INIT_FLIST_HEAD(&sk_out->list);
1208 sk_out->lock = fio_mutex_init(FIO_MUTEX_UNLOCKED);
1209 sk_out->wait = fio_mutex_init(FIO_MUTEX_LOCKED);
1210
1211 pthread_setspecific(sk_out_key, sk_out);
1212
122c7725 1213 while (!exit_backend) {
479471c4
JA
1214 const char *from;
1215 char buf[64];
122c7725 1216 pid_t pid;
009b1be4 1217
122c7725
JA
1218 pfd.fd = listen_sk;
1219 pfd.events = POLLIN;
1220 do {
54163255
JA
1221 int timeout = 1000;
1222
1223 if (!flist_empty(&conn_list))
1224 timeout = 100;
1225
1226 ret = poll(&pfd, 1, timeout);
122c7725
JA
1227 if (ret < 0) {
1228 if (errno == EINTR)
1229 break;
1230 log_err("fio: poll: %s\n", strerror(errno));
1231 break;
1232 } else if (!ret) {
6348b5de 1233 fio_server_check_conns(&conn_list);
122c7725
JA
1234 continue;
1235 }
009b1be4 1236
122c7725
JA
1237 if (pfd.revents & POLLIN)
1238 break;
1239 } while (!exit_backend);
50d16976 1240
6348b5de 1241 fio_server_check_conns(&conn_list);
46c48f1f 1242
122c7725
JA
1243 if (exit_backend || ret < 0)
1244 break;
1245
479471c4
JA
1246 if (use_ipv6)
1247 sk = accept(listen_sk, (struct sockaddr *) &addr6, &len);
1248 else
1249 sk = accept(listen_sk, (struct sockaddr *) &addr, &len);
1250
122c7725
JA
1251 if (sk < 0) {
1252 log_err("fio: accept: %s\n", strerror(errno));
1253 return -1;
1254 }
37db14fe 1255
479471c4
JA
1256 if (use_ipv6)
1257 from = inet_ntop(AF_INET6, (struct sockaddr *) &addr6.sin6_addr, buf, sizeof(buf));
1258 else
1259 from = inet_ntop(AF_INET, (struct sockaddr *) &addr.sin_addr, buf, sizeof(buf));
1260
1261 dprint(FD_NET, "server: connect from %s\n", from);
50d16976 1262
f77d4b73
JA
1263 sk_out->sk = sk;
1264
122c7725
JA
1265 pid = fork();
1266 if (pid) {
1267 close(sk);
6348b5de 1268 fio_server_add_conn_pid(&conn_list, pid);
f77d4b73 1269 pthread_setspecific(sk_out_key, sk_out);
122c7725
JA
1270 continue;
1271 }
5c341e9a 1272
122c7725 1273 /* exits */
68d96e51 1274 get_my_addr_str(sk); /* if error, it's already logged, non-fatal */
f77d4b73 1275 handle_connection(sk_out);
122c7725 1276 }
5c341e9a 1277
f77d4b73
JA
1278#if 0
1279 fio_mutex_remove(sk_out->lock);
1280 fio_mutex_remove(sk_out->wait);
1281 sfree(sk_out);
1282 pthread_setspecific(sk_out_key, NULL);
1283#endif
1284
132159a5 1285 return exitval;
50d16976
JA
1286}
1287
084d1c6f 1288int fio_server_text_output(int level, const char *buf, size_t len)
37db14fe 1289{
f77d4b73 1290 struct sk_out *sk_out = pthread_getspecific(sk_out_key);
084d1c6f
JA
1291 struct cmd_text_pdu *pdu;
1292 unsigned int tlen;
1293 struct timeval tv;
1294
f77d4b73 1295 if (!sk_out || sk_out->sk == -1)
408e0b90 1296 return -1;
084d1c6f
JA
1297
1298 tlen = sizeof(*pdu) + len;
1299 pdu = malloc(tlen);
1300
1301 pdu->level = __cpu_to_le32(level);
1302 pdu->buf_len = __cpu_to_le32(len);
1303
1304 gettimeofday(&tv, NULL);
1305 pdu->log_sec = __cpu_to_le64(tv.tv_sec);
1306 pdu->log_usec = __cpu_to_le64(tv.tv_usec);
337d75a8 1307
084d1c6f
JA
1308 memcpy(pdu->buf, buf, len);
1309
f77d4b73 1310 fio_net_queue_cmd(FIO_NET_CMD_TEXT, pdu, tlen, NULL, SK_F_COPY);
084d1c6f
JA
1311 free(pdu);
1312 return len;
142575e6
JA
1313}
1314
a64e88da
JA
1315static void convert_io_stat(struct io_stat *dst, struct io_stat *src)
1316{
1317 dst->max_val = cpu_to_le64(src->max_val);
1318 dst->min_val = cpu_to_le64(src->min_val);
1319 dst->samples = cpu_to_le64(src->samples);
802ad4a8
JA
1320
1321 /*
1322 * Encode to IEEE 754 for network transfer
1323 */
e5c9093d
JA
1324 dst->mean.u.i = cpu_to_le64(fio_double_to_uint64(src->mean.u.f));
1325 dst->S.u.i = cpu_to_le64(fio_double_to_uint64(src->S.u.f));
a64e88da
JA
1326}
1327
1328static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src)
1329{
1330 int i;
1331
298921d6 1332 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
a64e88da
JA
1333 dst->max_run[i] = cpu_to_le64(src->max_run[i]);
1334 dst->min_run[i] = cpu_to_le64(src->min_run[i]);
1335 dst->max_bw[i] = cpu_to_le64(src->max_bw[i]);
1336 dst->min_bw[i] = cpu_to_le64(src->min_bw[i]);
1337 dst->io_kb[i] = cpu_to_le64(src->io_kb[i]);
1338 dst->agg[i] = cpu_to_le64(src->agg[i]);
1339 }
1340
1341 dst->kb_base = cpu_to_le32(src->kb_base);
ad705bcb 1342 dst->unit_base = cpu_to_le32(src->unit_base);
a64e88da 1343 dst->groupid = cpu_to_le32(src->groupid);
771e58be 1344 dst->unified_rw_rep = cpu_to_le32(src->unified_rw_rep);
a64e88da
JA
1345}
1346
1347/*
1348 * Send a CMD_TS, which packs struct thread_stat and group_run_stats
1349 * into a single payload.
1350 */
1351void fio_server_send_ts(struct thread_stat *ts, struct group_run_stats *rs)
1352{
1353 struct cmd_ts_pdu p;
1354 int i, j;
1355
60efd14e
JA
1356 dprint(FD_NET, "server sending end stats\n");
1357
317b3c8b
JA
1358 memset(&p, 0, sizeof(p));
1359
4e59d0f3
JA
1360 strncpy(p.ts.name, ts->name, FIO_JOBNAME_SIZE - 1);
1361 strncpy(p.ts.verror, ts->verror, FIO_VERROR_SIZE - 1);
1362 strncpy(p.ts.description, ts->description, FIO_JOBDESC_SIZE - 1);
a64e88da 1363
2f122b13
JA
1364 p.ts.error = cpu_to_le32(ts->error);
1365 p.ts.thread_number = cpu_to_le32(ts->thread_number);
1366 p.ts.groupid = cpu_to_le32(ts->groupid);
1367 p.ts.pid = cpu_to_le32(ts->pid);
1368 p.ts.members = cpu_to_le32(ts->members);
771e58be 1369 p.ts.unified_rw_rep = cpu_to_le32(ts->unified_rw_rep);
a64e88da 1370
298921d6 1371 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
a64e88da
JA
1372 convert_io_stat(&p.ts.clat_stat[i], &ts->clat_stat[i]);
1373 convert_io_stat(&p.ts.slat_stat[i], &ts->slat_stat[i]);
1374 convert_io_stat(&p.ts.lat_stat[i], &ts->lat_stat[i]);
1375 convert_io_stat(&p.ts.bw_stat[i], &ts->bw_stat[i]);
1376 }
1377
1378 p.ts.usr_time = cpu_to_le64(ts->usr_time);
1379 p.ts.sys_time = cpu_to_le64(ts->sys_time);
1380 p.ts.ctx = cpu_to_le64(ts->ctx);
1381 p.ts.minf = cpu_to_le64(ts->minf);
1382 p.ts.majf = cpu_to_le64(ts->majf);
1383 p.ts.clat_percentiles = cpu_to_le64(ts->clat_percentiles);
988d97ba 1384 p.ts.percentile_precision = cpu_to_le64(ts->percentile_precision);
802ad4a8
JA
1385
1386 for (i = 0; i < FIO_IO_U_LIST_MAX_LEN; i++) {
cfc03e46
JA
1387 fio_fp64_t *src = &ts->percentile_list[i];
1388 fio_fp64_t *dst = &p.ts.percentile_list[i];
802ad4a8 1389
e5c9093d 1390 dst->u.i = cpu_to_le64(fio_double_to_uint64(src->u.f));
802ad4a8 1391 }
a64e88da
JA
1392
1393 for (i = 0; i < FIO_IO_U_MAP_NR; i++) {
1394 p.ts.io_u_map[i] = cpu_to_le32(ts->io_u_map[i]);
1395 p.ts.io_u_submit[i] = cpu_to_le32(ts->io_u_submit[i]);
1396 p.ts.io_u_complete[i] = cpu_to_le32(ts->io_u_complete[i]);
1397 }
1398
1399 for (i = 0; i < FIO_IO_U_LAT_U_NR; i++) {
1400 p.ts.io_u_lat_u[i] = cpu_to_le32(ts->io_u_lat_u[i]);
1401 p.ts.io_u_lat_m[i] = cpu_to_le32(ts->io_u_lat_m[i]);
1402 }
1403
298921d6 1404 for (i = 0; i < DDIR_RWDIR_CNT; i++)
a64e88da
JA
1405 for (j = 0; j < FIO_IO_U_PLAT_NR; j++)
1406 p.ts.io_u_plat[i][j] = cpu_to_le32(ts->io_u_plat[i][j]);
1407
78799deb 1408 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
a64e88da 1409 p.ts.total_io_u[i] = cpu_to_le64(ts->total_io_u[i]);
93eee04a 1410 p.ts.short_io_u[i] = cpu_to_le64(ts->short_io_u[i]);
3bcb9d94 1411 p.ts.drop_io_u[i] = cpu_to_le64(ts->drop_io_u[i]);
a64e88da
JA
1412 }
1413
93eee04a 1414 p.ts.total_submit = cpu_to_le64(ts->total_submit);
a64e88da
JA
1415 p.ts.total_complete = cpu_to_le64(ts->total_complete);
1416
298921d6 1417 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
a64e88da
JA
1418 p.ts.io_bytes[i] = cpu_to_le64(ts->io_bytes[i]);
1419 p.ts.runtime[i] = cpu_to_le64(ts->runtime[i]);
1420 }
1421
1422 p.ts.total_run_time = cpu_to_le64(ts->total_run_time);
1423 p.ts.continue_on_error = cpu_to_le16(ts->continue_on_error);
1424 p.ts.total_err_count = cpu_to_le64(ts->total_err_count);
ddcc0b69
JA
1425 p.ts.first_error = cpu_to_le32(ts->first_error);
1426 p.ts.kb_base = cpu_to_le32(ts->kb_base);
ad705bcb 1427 p.ts.unit_base = cpu_to_le32(ts->unit_base);
a64e88da 1428
3e260a46
JA
1429 p.ts.latency_depth = cpu_to_le32(ts->latency_depth);
1430 p.ts.latency_target = cpu_to_le64(ts->latency_target);
1431 p.ts.latency_window = cpu_to_le64(ts->latency_window);
e5c9093d 1432 p.ts.latency_percentile.u.i = cpu_to_le64(fio_double_to_uint64(ts->latency_percentile.u.f));
3e260a46 1433
66347cfa
DE
1434 p.ts.nr_block_infos = le64_to_cpu(ts->nr_block_infos);
1435 for (i = 0; i < p.ts.nr_block_infos; i++)
1436 p.ts.block_infos[i] = le32_to_cpu(ts->block_infos[i]);
1437
a64e88da
JA
1438 convert_gs(&p.rs, rs);
1439
f77d4b73 1440 fio_net_queue_cmd(FIO_NET_CMD_TS, &p, sizeof(p), NULL, SK_F_COPY);
a64e88da
JA
1441}
1442
1443void fio_server_send_gs(struct group_run_stats *rs)
1444{
1445 struct group_run_stats gs;
1446
60efd14e
JA
1447 dprint(FD_NET, "server sending group run stats\n");
1448
a64e88da 1449 convert_gs(&gs, rs);
f77d4b73 1450 fio_net_queue_cmd(FIO_NET_CMD_GS, &gs, sizeof(gs), NULL, SK_F_COPY);
cf451d1e
JA
1451}
1452
d09a64a0
JA
1453static void convert_agg(struct disk_util_agg *dst, struct disk_util_agg *src)
1454{
1455 int i;
1456
1457 for (i = 0; i < 2; i++) {
504bc961
JA
1458 dst->ios[i] = cpu_to_le64(src->ios[i]);
1459 dst->merges[i] = cpu_to_le64(src->merges[i]);
d09a64a0 1460 dst->sectors[i] = cpu_to_le64(src->sectors[i]);
504bc961 1461 dst->ticks[i] = cpu_to_le64(src->ticks[i]);
d09a64a0
JA
1462 }
1463
504bc961
JA
1464 dst->io_ticks = cpu_to_le64(src->io_ticks);
1465 dst->time_in_queue = cpu_to_le64(src->time_in_queue);
d09a64a0 1466 dst->slavecount = cpu_to_le32(src->slavecount);
e5c9093d 1467 dst->max_util.u.i = cpu_to_le64(fio_double_to_uint64(src->max_util.u.f));
d09a64a0
JA
1468}
1469
1470static void convert_dus(struct disk_util_stat *dst, struct disk_util_stat *src)
1471{
1472 int i;
1473
59140421
JA
1474 dst->name[FIO_DU_NAME_SZ - 1] = '\0';
1475 strncpy((char *) dst->name, (char *) src->name, FIO_DU_NAME_SZ - 1);
d09a64a0
JA
1476
1477 for (i = 0; i < 2; i++) {
504bc961
JA
1478 dst->s.ios[i] = cpu_to_le64(src->s.ios[i]);
1479 dst->s.merges[i] = cpu_to_le64(src->s.merges[i]);
a3b4cf7d 1480 dst->s.sectors[i] = cpu_to_le64(src->s.sectors[i]);
504bc961 1481 dst->s.ticks[i] = cpu_to_le64(src->s.ticks[i]);
d09a64a0
JA
1482 }
1483
504bc961
JA
1484 dst->s.io_ticks = cpu_to_le64(src->s.io_ticks);
1485 dst->s.time_in_queue = cpu_to_le64(src->s.time_in_queue);
a3b4cf7d 1486 dst->s.msec = cpu_to_le64(src->s.msec);
d09a64a0
JA
1487}
1488
1489void fio_server_send_du(void)
1490{
1491 struct disk_util *du;
1492 struct flist_head *entry;
1493 struct cmd_du_pdu pdu;
1494
1495 dprint(FD_NET, "server: sending disk_util %d\n", !flist_empty(&disk_list));
1496
0766d92e
JA
1497 memset(&pdu, 0, sizeof(pdu));
1498
d09a64a0
JA
1499 flist_for_each(entry, &disk_list) {
1500 du = flist_entry(entry, struct disk_util, list);
1501
1502 convert_dus(&pdu.dus, &du->dus);
1503 convert_agg(&pdu.agg, &du->agg);
1504
f77d4b73 1505 fio_net_queue_cmd(FIO_NET_CMD_DU, &pdu, sizeof(pdu), NULL, SK_F_COPY);
d09a64a0
JA
1506 }
1507}
1508
02594e37 1509static int fio_send_iolog_gz(struct sk_entry *first, struct io_log *log)
1b42725f 1510{
3989b143
JA
1511 int ret = 0;
1512#ifdef CONFIG_ZLIB
02594e37 1513 struct sk_entry *entry;
1b42725f
JA
1514 z_stream stream;
1515 void *out_pdu;
1b42725f
JA
1516
1517 /*
1518 * Dirty - since the log is potentially huge, compress it into
1519 * FIO_SERVER_MAX_FRAGMENT_PDU chunks and let the receiving
1520 * side defragment it.
1521 */
1522 out_pdu = malloc(FIO_SERVER_MAX_FRAGMENT_PDU);
1523
1524 stream.zalloc = Z_NULL;
1525 stream.zfree = Z_NULL;
1526 stream.opaque = Z_NULL;
1527
1528 if (deflateInit(&stream, Z_DEFAULT_COMPRESSION) != Z_OK) {
53bd8dbc
JA
1529 ret = 1;
1530 goto err;
1b42725f
JA
1531 }
1532
f5ed765a 1533 stream.next_in = (void *) log->log;
ae588852 1534 stream.avail_in = log->nr_samples * log_entry_sz(log);
1b42725f
JA
1535
1536 do {
02594e37 1537 unsigned int this_len;
1b42725f
JA
1538
1539 stream.avail_out = FIO_SERVER_MAX_FRAGMENT_PDU;
1540 stream.next_out = out_pdu;
3c547fe0
JA
1541 ret = deflate(&stream, Z_FINISH);
1542 /* may be Z_OK, or Z_STREAM_END */
1543 if (ret < 0)
1544 goto err_zlib;
1b42725f
JA
1545
1546 this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream.avail_out;
1547
02594e37
JA
1548 entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, out_pdu, this_len,
1549 NULL, SK_F_FREE | SK_F_VEC);
1550 flist_add_tail(&entry->list, &first->next);
1b42725f
JA
1551 } while (stream.avail_in);
1552
53bd8dbc 1553err_zlib:
1b42725f 1554 deflateEnd(&stream);
53bd8dbc
JA
1555err:
1556 free(out_pdu);
3989b143 1557#endif
53bd8dbc 1558 return ret;
1b42725f
JA
1559}
1560
3989b143
JA
1561int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
1562{
1563 struct cmd_iolog_pdu pdu;
02594e37 1564 struct sk_entry *first;
3989b143
JA
1565 int i, ret = 0;
1566
ae588852 1567 pdu.nr_samples = cpu_to_le64(log->nr_samples);
3989b143 1568 pdu.thread_number = cpu_to_le32(td->thread_number);
3989b143
JA
1569 pdu.log_type = cpu_to_le32(log->log_type);
1570 pdu.compressed = cpu_to_le32(use_zlib);
9bdb9265
JA
1571
1572 strncpy((char *) pdu.name, name, FIO_NET_NAME_MAX);
1573 pdu.name[FIO_NET_NAME_MAX - 1] = '\0';
3989b143
JA
1574
1575 for (i = 0; i < log->nr_samples; i++) {
ae588852 1576 struct io_sample *s = get_sample(log, i);
3989b143 1577
b26317c9
JA
1578 s->time = cpu_to_le64(s->time);
1579 s->val = cpu_to_le64(s->val);
1580 s->__ddir = cpu_to_le32(s->__ddir);
1581 s->bs = cpu_to_le32(s->bs);
ae588852
JA
1582
1583 if (log->log_offset) {
1584 struct io_sample_offset *so = (void *) s;
1585
1586 so->offset = cpu_to_le64(so->offset);
1587 }
3989b143
JA
1588 }
1589
1590 /*
02594e37 1591 * Assemble header entry first
3989b143 1592 */
02594e37 1593 first = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, &pdu, sizeof(pdu), NULL, SK_F_COPY | SK_F_VEC);
3989b143
JA
1594
1595 /*
02594e37
JA
1596 * Now append actual log entries. Compress if we can, otherwise just
1597 * plain text output.
3989b143
JA
1598 */
1599 if (use_zlib)
02594e37 1600 ret = fio_send_iolog_gz(first, log);
f77d4b73 1601 else {
02594e37
JA
1602 struct sk_entry *entry;
1603
1604 entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, log->log,
1605 log->nr_samples * log_entry_sz(log),
1606 NULL, SK_F_FREE | SK_F_VEC);
1607 flist_add_tail(&entry->list, &first->next);
f77d4b73 1608 }
3989b143 1609
f77d4b73 1610 return ret;
3989b143
JA
1611}
1612
2f122b13 1613void fio_server_send_add_job(struct thread_data *td)
807f9971
JA
1614{
1615 struct cmd_add_job_pdu pdu;
807f9971 1616
731e30a2 1617 memset(&pdu, 0, sizeof(pdu));
2f122b13
JA
1618 pdu.thread_number = cpu_to_le32(td->thread_number);
1619 pdu.groupid = cpu_to_le32(td->groupid);
1620 convert_thread_options_to_net(&pdu.top, &td->o);
807f9971 1621
f77d4b73 1622 fio_net_queue_cmd(FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), NULL, SK_F_COPY);
807f9971
JA
1623}
1624
122c7725
JA
1625void fio_server_send_start(struct thread_data *td)
1626{
f77d4b73
JA
1627 struct sk_out *sk_out = pthread_getspecific(sk_out_key);
1628
1629 assert(sk_out->sk != -1);
122c7725 1630
f77d4b73 1631 fio_net_queue_cmd(FIO_NET_CMD_SERVER_START, NULL, 0, 0, SK_F_SIMPLE);
122c7725
JA
1632}
1633
ca09be4b 1634int fio_server_get_verify_state(const char *name, int threadnumber,
c3546b53 1635 void **datap, int *version)
ca09be4b
JA
1636{
1637 struct thread_io_list *s;
1638 struct cmd_sendfile out;
1639 struct cmd_reply *rep;
1640 uint64_t tag;
1641 void *data;
1642
1643 dprint(FD_NET, "server: request verify state\n");
1644
1645 rep = smalloc(sizeof(*rep));
1646 if (!rep) {
1647 log_err("fio: smalloc pool too small\n");
1648 return 1;
1649 }
1650
1651 __fio_mutex_init(&rep->lock, FIO_MUTEX_LOCKED);
1652 rep->data = NULL;
1653 rep->error = 0;
1654
e499aedc
JA
1655 verify_state_gen_name((char *) out.path, sizeof(out.path), name, me,
1656 threadnumber);
ca09be4b 1657 tag = (uint64_t) (uintptr_t) rep;
f77d4b73 1658 fio_net_queue_cmd(FIO_NET_CMD_SENDFILE, &out, sizeof(out), &tag, SK_F_COPY);
ca09be4b
JA
1659
1660 /*
1661 * Wait for the backend to receive the reply
1662 */
09400a60 1663 if (fio_mutex_down_timeout(&rep->lock, 10000)) {
ca09be4b
JA
1664 log_err("fio: timed out waiting for reply\n");
1665 goto fail;
1666 }
1667
1668 if (rep->error) {
1669 log_err("fio: failure on receiving state file: %s\n", strerror(rep->error));
1670fail:
1671 *datap = NULL;
1672 sfree(rep);
f77d4b73 1673 fio_net_queue_quit();
ca09be4b
JA
1674 return 1;
1675 }
1676
1677 /*
1678 * The format is verify_state_hdr, then thread_io_list. Verify
1679 * the header, and the thread_io_list checksum
1680 */
1681 s = rep->data + sizeof(struct verify_state_hdr);
c3546b53 1682 if (verify_state_hdr(rep->data, s, version))
ca09be4b
JA
1683 goto fail;
1684
1685 /*
1686 * Don't need the header from now, copy just the thread_io_list
1687 */
1688 rep->size -= sizeof(struct verify_state_hdr);
1689 data = malloc(rep->size);
1690 memcpy(data, s, rep->size);
1691 *datap = data;
1692
1693 sfree(rep->data);
f5a42524 1694 __fio_mutex_remove(&rep->lock);
ca09be4b
JA
1695 sfree(rep);
1696 return 0;
1697}
1698
87aa8f19 1699static int fio_init_server_ip(void)
81179eec 1700{
811826be 1701 struct sockaddr *addr;
67bf9823 1702 socklen_t socklen;
17e3531a
JA
1703 char buf[80];
1704 const char *str;
87aa8f19 1705 int sk, opt;
81179eec 1706
811826be
JA
1707 if (use_ipv6)
1708 sk = socket(AF_INET6, SOCK_STREAM, 0);
1709 else
1710 sk = socket(AF_INET, SOCK_STREAM, 0);
1711
81179eec
JA
1712 if (sk < 0) {
1713 log_err("fio: socket: %s\n", strerror(errno));
1714 return -1;
1715 }
1716
1717 opt = 1;
1f81991e 1718 if (setsockopt(sk, SOL_SOCKET, SO_REUSEADDR, (void *)&opt, sizeof(opt)) < 0) {
669e8bf8 1719 log_err("fio: setsockopt(REUSEADDR): %s\n", strerror(errno));
b94cba47 1720 close(sk);
81179eec
JA
1721 return -1;
1722 }
1723#ifdef SO_REUSEPORT
6eb24791 1724 if (setsockopt(sk, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
669e8bf8 1725 log_err("fio: setsockopt(REUSEPORT): %s\n", strerror(errno));
b94cba47 1726 close(sk);
81179eec
JA
1727 return -1;
1728 }
1729#endif
1730
811826be 1731 if (use_ipv6) {
17e3531a
JA
1732 const void *src = &saddr_in6.sin6_addr;
1733
811826be
JA
1734 addr = (struct sockaddr *) &saddr_in6;
1735 socklen = sizeof(saddr_in6);
1736 saddr_in6.sin6_family = AF_INET6;
17e3531a 1737 str = inet_ntop(AF_INET6, src, buf, sizeof(buf));
811826be 1738 } else {
17e3531a
JA
1739 const void *src = &saddr_in.sin_addr;
1740
811826be
JA
1741 addr = (struct sockaddr *) &saddr_in;
1742 socklen = sizeof(saddr_in);
1743 saddr_in.sin_family = AF_INET;
17e3531a 1744 str = inet_ntop(AF_INET, src, buf, sizeof(buf));
811826be 1745 }
81179eec 1746
811826be 1747 if (bind(sk, addr, socklen) < 0) {
81179eec 1748 log_err("fio: bind: %s\n", strerror(errno));
17e3531a 1749 log_info("fio: failed with IPv%c %s\n", use_ipv6 ? '6' : '4', str);
b94cba47 1750 close(sk);
81179eec
JA
1751 return -1;
1752 }
1753
87aa8f19
JA
1754 return sk;
1755}
1756
1757static int fio_init_server_sock(void)
1758{
1759 struct sockaddr_un addr;
67bf9823 1760 socklen_t len;
87aa8f19
JA
1761 mode_t mode;
1762 int sk;
1763
1764 sk = socket(AF_UNIX, SOCK_STREAM, 0);
1765 if (sk < 0) {
1766 log_err("fio: socket: %s\n", strerror(errno));
1767 return -1;
1768 }
1769
1770 mode = umask(000);
1771
1772 memset(&addr, 0, sizeof(addr));
1773 addr.sun_family = AF_UNIX;
a48fddbc 1774 strncpy(addr.sun_path, bind_sock, sizeof(addr.sun_path) - 1);
87aa8f19
JA
1775
1776 len = sizeof(addr.sun_family) + strlen(bind_sock) + 1;
1777
1778 if (bind(sk, (struct sockaddr *) &addr, len) < 0) {
1779 log_err("fio: bind: %s\n", strerror(errno));
b94cba47 1780 close(sk);
87aa8f19
JA
1781 return -1;
1782 }
1783
1784 umask(mode);
1785 return sk;
1786}
1787
1788static int fio_init_server_connection(void)
1789{
bebe6398 1790 char bind_str[128];
87aa8f19
JA
1791 int sk;
1792
1793 dprint(FD_NET, "starting server\n");
1794
1795 if (!bind_sock)
1796 sk = fio_init_server_ip();
1797 else
1798 sk = fio_init_server_sock();
1799
1800 if (sk < 0)
1801 return sk;
1802
afdcad23
JA
1803 memset(bind_str, 0, sizeof(bind_str));
1804
811826be
JA
1805 if (!bind_sock) {
1806 char *p, port[16];
1807 const void *src;
1808 int af;
1809
1810 if (use_ipv6) {
1811 af = AF_INET6;
1812 src = &saddr_in6.sin6_addr;
1813 } else {
1814 af = AF_INET;
1815 src = &saddr_in.sin_addr;
1816 }
1817
1818 p = (char *) inet_ntop(af, src, bind_str, sizeof(bind_str));
1819
1820 sprintf(port, ",%u", fio_net_port);
1821 if (p)
1822 strcat(p, port);
1823 else
afdcad23 1824 strncpy(bind_str, port, sizeof(bind_str) - 1);
811826be 1825 } else
afdcad23 1826 strncpy(bind_str, bind_sock, sizeof(bind_str) - 1);
bebe6398
JA
1827
1828 log_info("fio: server listening on %s\n", bind_str);
1829
89c1707c 1830 if (listen(sk, 0) < 0) {
81179eec 1831 log_err("fio: listen: %s\n", strerror(errno));
2fd973c9 1832 close(sk);
81179eec
JA
1833 return -1;
1834 }
1835
87aa8f19
JA
1836 return sk;
1837}
1838
3aa3ceeb 1839int fio_server_parse_host(const char *host, int ipv6, struct in_addr *inp,
3ec62ec4
JA
1840 struct in6_addr *inp6)
1841
1842{
1843 int ret = 0;
1844
3aa3ceeb 1845 if (ipv6)
3ec62ec4
JA
1846 ret = inet_pton(AF_INET6, host, inp6);
1847 else
1848 ret = inet_pton(AF_INET, host, inp);
1849
1850 if (ret != 1) {
479471c4
JA
1851 struct addrinfo hints, *res;
1852
1853 memset(&hints, 0, sizeof(hints));
3aa3ceeb 1854 hints.ai_family = ipv6 ? AF_INET6 : AF_INET;
479471c4 1855 hints.ai_socktype = SOCK_STREAM;
3ec62ec4 1856
479471c4
JA
1857 ret = getaddrinfo(host, NULL, &hints, &res);
1858 if (ret) {
1859 log_err("fio: failed to resolve <%s> (%s)\n", host,
1860 gai_strerror(ret));
3caf43e3 1861 return 1;
3ec62ec4
JA
1862 }
1863
3aa3ceeb 1864 if (ipv6)
479471c4
JA
1865 memcpy(inp6, &((struct sockaddr_in6 *) res->ai_addr)->sin6_addr, sizeof(*inp6));
1866 else
1867 memcpy(inp, &((struct sockaddr_in *) res->ai_addr)->sin_addr, sizeof(*inp));
1868
3ec62ec4 1869 ret = 1;
479471c4 1870 freeaddrinfo(res);
3ec62ec4
JA
1871 }
1872
1873 return !(ret == 1);
1874}
1875
660a2bfb
JA
1876/*
1877 * Parse a host/ip/port string. Reads from 'str'.
1878 *
1879 * Outputs:
1880 *
1881 * For IPv4:
1882 * *ptr is the host, *port is the port, inp is the destination.
1883 * For IPv6:
1884 * *ptr is the host, *port is the port, inp6 is the dest, and *ipv6 is 1.
1885 * For local domain sockets:
1886 * *ptr is the filename, *is_sock is 1.
1887 */
bebe6398 1888int fio_server_parse_string(const char *str, char **ptr, int *is_sock,
811826be
JA
1889 int *port, struct in_addr *inp,
1890 struct in6_addr *inp6, int *ipv6)
bebe6398 1891{
76867621
JA
1892 const char *host = str;
1893 char *portp;
3ec62ec4 1894 int lport = 0;
76867621 1895
bebe6398
JA
1896 *ptr = NULL;
1897 *is_sock = 0;
6d2cf394 1898 *port = fio_net_port;
811826be 1899 *ipv6 = 0;
bebe6398
JA
1900
1901 if (!strncmp(str, "sock:", 5)) {
1902 *ptr = strdup(str + 5);
1903 *is_sock = 1;
76867621
JA
1904
1905 return 0;
1906 }
1907
1908 /*
1909 * Is it ip:<ip or host>:port
1910 */
1911 if (!strncmp(host, "ip:", 3))
1912 host += 3;
1913 else if (!strncmp(host, "ip4:", 4))
1914 host += 4;
1915 else if (!strncmp(host, "ip6:", 4)) {
1916 host += 4;
1917 *ipv6 = 1;
1918 } else if (host[0] == ':') {
1919 /* String is :port */
1920 host++;
1921 lport = atoi(host);
1922 if (!lport || lport > 65535) {
4e0a8fa2 1923 log_err("fio: bad server port %u\n", lport);
76867621
JA
1924 return 1;
1925 }
1926 /* no hostname given, we are done */
1927 *port = lport;
1928 return 0;
1929 }
1930
1931 /*
b96b90c3 1932 * If no port seen yet, check if there's a last ',' at the end
76867621
JA
1933 */
1934 if (!lport) {
1935 portp = strchr(host, ',');
1936 if (portp) {
1937 *portp = '\0';
1938 portp++;
1939 lport = atoi(portp);
bebe6398 1940 if (!lport || lport > 65535) {
4e0a8fa2 1941 log_err("fio: bad server port %u\n", lport);
bebe6398
JA
1942 return 1;
1943 }
bebe6398 1944 }
76867621 1945 }
bebe6398 1946
76867621
JA
1947 if (lport)
1948 *port = lport;
bebe6398 1949
76867621
JA
1950 if (!strlen(host))
1951 return 0;
bebe6398 1952
76867621 1953 *ptr = strdup(host);
811826be 1954
3aa3ceeb 1955 if (fio_server_parse_host(*ptr, *ipv6, inp, inp6)) {
3ec62ec4
JA
1956 free(*ptr);
1957 *ptr = NULL;
1958 return 1;
bebe6398
JA
1959 }
1960
1961 if (*port == 0)
1962 *port = fio_net_port;
1963
1964 return 0;
1965}
1966
87aa8f19
JA
1967/*
1968 * Server arg should be one of:
1969 *
1970 * sock:/path/to/socket
1971 * ip:1.2.3.4
1972 * 1.2.3.4
1973 *
1974 * Where sock uses unix domain sockets, and ip binds the server to
1975 * a specific interface. If no arguments are given to the server, it
1976 * uses IP and binds to 0.0.0.0.
1977 *
1978 */
1979static int fio_handle_server_arg(void)
1980{
6d2cf394 1981 int port = fio_net_port;
a7de0a11 1982 int is_sock, ret = 0;
bebe6398 1983
87aa8f19
JA
1984 saddr_in.sin_addr.s_addr = htonl(INADDR_ANY);
1985
1986 if (!fio_server_arg)
a7de0a11 1987 goto out;
87aa8f19 1988
4e5b8fb8 1989 ret = fio_server_parse_string(fio_server_arg, &bind_sock, &is_sock,
811826be
JA
1990 &port, &saddr_in.sin_addr,
1991 &saddr_in6.sin6_addr, &use_ipv6);
4e5b8fb8
JA
1992
1993 if (!is_sock && bind_sock) {
1994 free(bind_sock);
1995 bind_sock = NULL;
1996 }
1997
a7de0a11 1998out:
6d2cf394
JA
1999 fio_net_port = port;
2000 saddr_in.sin_port = htons(port);
811826be 2001 saddr_in6.sin6_port = htons(port);
4e5b8fb8 2002 return ret;
87aa8f19
JA
2003}
2004
43cdea1d
JA
2005static void sig_int(int sig)
2006{
2007 if (bind_sock)
2008 unlink(bind_sock);
2009}
2010
2011static void set_sig_handlers(void)
2012{
2013 struct sigaction act;
2014
2015 memset(&act, 0, sizeof(act));
2016 act.sa_handler = sig_int;
2017 act.sa_flags = SA_RESTART;
2018 sigaction(SIGINT, &act, NULL);
2019}
2020
87aa8f19
JA
2021static int fio_server(void)
2022{
2023 int sk, ret;
2024
2025 dprint(FD_NET, "starting server\n");
2026
2027 if (fio_handle_server_arg())
2028 return -1;
2029
2030 sk = fio_init_server_connection();
2031 if (sk < 0)
2032 return -1;
81179eec 2033
43cdea1d
JA
2034 set_sig_handlers();
2035
f77d4b73
JA
2036 if (pthread_key_create(&sk_out_key, NULL))
2037 log_err("fio: can't create sk_out backend key\n");
2038
81179eec 2039 ret = accept_loop(sk);
87aa8f19 2040
81179eec 2041 close(sk);
87aa8f19
JA
2042
2043 if (fio_server_arg) {
2044 free(fio_server_arg);
2045 fio_server_arg = NULL;
2046 }
bebe6398
JA
2047 if (bind_sock)
2048 free(bind_sock);
87aa8f19 2049
81179eec
JA
2050 return ret;
2051}
2052
7b821684 2053void fio_server_got_signal(int signal)
9abea48b 2054{
f77d4b73
JA
2055 struct sk_out *sk_out = pthread_getspecific(sk_out_key);
2056
2057 assert(sk_out);
2058
7b821684 2059 if (signal == SIGPIPE)
f77d4b73 2060 sk_out->sk = -1;
7b821684
JA
2061 else {
2062 log_info("\nfio: terminating on signal %d\n", signal);
2063 exit_backend = 1;
2064 }
9abea48b
JA
2065}
2066
13755d94
JA
2067static int check_existing_pidfile(const char *pidfile)
2068{
2069 struct stat sb;
2070 char buf[16];
2071 pid_t pid;
2072 FILE *f;
2073
2074 if (stat(pidfile, &sb))
2075 return 0;
2076
2077 f = fopen(pidfile, "r");
2078 if (!f)
2079 return 0;
2080
bfc3b179 2081 if (fread(buf, sb.st_size, 1, f) <= 0) {
13755d94
JA
2082 fclose(f);
2083 return 1;
2084 }
2085 fclose(f);
2086
2087 pid = atoi(buf);
2088 if (kill(pid, SIGCONT) < 0)
ea5aa1be 2089 return errno != ESRCH;
13755d94
JA
2090
2091 return 1;
2092}
2093
2094static int write_pid(pid_t pid, const char *pidfile)
402668f3
JA
2095{
2096 FILE *fpid;
2097
2098 fpid = fopen(pidfile, "w");
2099 if (!fpid) {
2100 log_err("fio: failed opening pid file %s\n", pidfile);
13755d94 2101 return 1;
402668f3
JA
2102 }
2103
2104 fprintf(fpid, "%u\n", (unsigned int) pid);
13755d94
JA
2105 fclose(fpid);
2106 return 0;
402668f3
JA
2107}
2108
2109/*
2110 * If pidfile is specified, background us.
2111 */
2112int fio_start_server(char *pidfile)
e46d8091
JA
2113{
2114 pid_t pid;
402668f3 2115 int ret;
e46d8091 2116
93bcfd20 2117#if defined(WIN32)
905c78b9 2118 WSADATA wsd;
3c3ed070 2119 WSAStartup(MAKEWORD(2, 2), &wsd);
93bcfd20
BC
2120#endif
2121
402668f3 2122 if (!pidfile)
e46d8091
JA
2123 return fio_server();
2124
13755d94
JA
2125 if (check_existing_pidfile(pidfile)) {
2126 log_err("fio: pidfile %s exists and server appears alive\n",
2127 pidfile);
b8ba87ac 2128 free(pidfile);
13755d94
JA
2129 return -1;
2130 }
2131
e46d8091
JA
2132 pid = fork();
2133 if (pid < 0) {
13755d94 2134 log_err("fio: failed server fork: %s", strerror(errno));
402668f3 2135 free(pidfile);
c28e8e8c 2136 return -1;
402668f3 2137 } else if (pid) {
8a1db9a1 2138 ret = write_pid(pid, pidfile);
b8ba87ac 2139 free(pidfile);
44c8268d 2140 _exit(ret);
402668f3 2141 }
e46d8091
JA
2142
2143 setsid();
13755d94
JA
2144 openlog("fio", LOG_NDELAY|LOG_NOWAIT|LOG_PID, LOG_USER);
2145 log_syslog = 1;
e46d8091
JA
2146 close(STDIN_FILENO);
2147 close(STDOUT_FILENO);
2148 close(STDERR_FILENO);
2149 f_out = NULL;
2150 f_err = NULL;
402668f3
JA
2151
2152 ret = fio_server();
2153
2154 closelog();
2155 unlink(pidfile);
2156 free(pidfile);
2157 return ret;
e46d8091 2158}
87aa8f19 2159
bebe6398 2160void fio_server_set_arg(const char *arg)
87aa8f19
JA
2161{
2162 fio_server_arg = strdup(arg);
2163}