Separate the act of adding and starting a job
[fio.git] / server.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <stdarg.h>
4 #include <unistd.h>
5 #include <limits.h>
6 #include <errno.h>
7 #include <fcntl.h>
8 #include <sys/poll.h>
9 #include <sys/types.h>
10 #include <sys/wait.h>
11 #include <sys/socket.h>
12 #include <sys/stat.h>
13 #include <sys/un.h>
14 #include <netinet/in.h>
15 #include <arpa/inet.h>
16 #include <netdb.h>
17 #include <syslog.h>
18 #include <signal.h>
19
20 #include "fio.h"
21 #include "server.h"
22 #include "crc/crc16.h"
23 #include "lib/ieee754.h"
24
25 #include "fio_version.h"
26
27 int fio_net_port = FIO_NET_PORT;
28
29 int exit_backend = 0;
30
31 static int server_fd = -1;
32 static char *fio_server_arg;
33 static char *bind_sock;
34 static struct sockaddr_in saddr_in;
35 static struct sockaddr_in6 saddr_in6;
36 static int first_cmd_check;
37 static int use_ipv6;
38
39 static const char *fio_server_ops[FIO_NET_CMD_NR] = {
40         "",
41         "QUIT",
42         "EXIT",
43         "JOB",
44         "JOBLINE",
45         "TEXT",
46         "TS",
47         "GS",
48         "SEND_ETA",
49         "ETA",
50         "PROBE",
51         "START",
52         "STOP",
53         "DISK_UTIL",
54         "SERVER_START",
55         "ADD_JOB",
56         "CMD_RUN"
57 };
58
59 const char *fio_server_op(unsigned int op)
60 {
61         static char buf[32];
62
63         if (op < FIO_NET_CMD_NR)
64                 return fio_server_ops[op];
65
66         sprintf(buf, "UNKNOWN/%d", op);
67         return buf;
68 }
69
70 int fio_send_data(int sk, const void *p, unsigned int len)
71 {
72         assert(len <= sizeof(struct fio_net_cmd) + FIO_SERVER_MAX_FRAGMENT_PDU);
73
74         do {
75                 int ret = send(sk, p, len, 0);
76
77                 if (ret > 0) {
78                         len -= ret;
79                         if (!len)
80                                 break;
81                         p += ret;
82                         continue;
83                 } else if (!ret)
84                         break;
85                 else if (errno == EAGAIN || errno == EINTR)
86                         continue;
87                 else
88                         break;
89         } while (!exit_backend);
90
91         if (!len)
92                 return 0;
93
94         return 1;
95 }
96
97 int fio_recv_data(int sk, void *p, unsigned int len)
98 {
99         do {
100                 int ret = recv(sk, p, len, MSG_WAITALL);
101
102                 if (ret > 0) {
103                         len -= ret;
104                         if (!len)
105                                 break;
106                         p += ret;
107                         continue;
108                 } else if (!ret)
109                         break;
110                 else if (errno == EAGAIN || errno == EINTR)
111                         continue;
112                 else
113                         break;
114         } while (!exit_backend);
115
116         if (!len)
117                 return 0;
118
119         return -1;
120 }
121
122 static int verify_convert_cmd(struct fio_net_cmd *cmd)
123 {
124         uint16_t crc;
125
126         cmd->cmd_crc16 = le16_to_cpu(cmd->cmd_crc16);
127         cmd->pdu_crc16 = le16_to_cpu(cmd->pdu_crc16);
128
129         crc = fio_crc16(cmd, FIO_NET_CMD_CRC_SZ);
130         if (crc != cmd->cmd_crc16) {
131                 log_err("fio: server bad crc on command (got %x, wanted %x)\n",
132                                 cmd->cmd_crc16, crc);
133                 return 1;
134         }
135
136         cmd->version    = le16_to_cpu(cmd->version);
137         cmd->opcode     = le16_to_cpu(cmd->opcode);
138         cmd->flags      = le32_to_cpu(cmd->flags);
139         cmd->tag        = le64_to_cpu(cmd->tag);
140         cmd->pdu_len    = le32_to_cpu(cmd->pdu_len);
141
142         switch (cmd->version) {
143         case FIO_SERVER_VER:
144                 break;
145         default:
146                 log_err("fio: bad server cmd version %d\n", cmd->version);
147                 return 1;
148         }
149
150         if (cmd->pdu_len > FIO_SERVER_MAX_FRAGMENT_PDU) {
151                 log_err("fio: command payload too large: %u\n", cmd->pdu_len);
152                 return 1;
153         }
154
155         return 0;
156 }
157
158 /*
159  * Read (and defragment, if necessary) incoming commands
160  */
161 struct fio_net_cmd *fio_net_recv_cmd(int sk)
162 {
163         struct fio_net_cmd cmd, *cmdret = NULL;
164         size_t cmd_size = 0, pdu_offset = 0;
165         uint16_t crc;
166         int ret, first = 1;
167         void *pdu = NULL;
168
169         do {
170                 ret = fio_recv_data(sk, &cmd, sizeof(cmd));
171                 if (ret)
172                         break;
173
174                 /* We have a command, verify it and swap if need be */
175                 ret = verify_convert_cmd(&cmd);
176                 if (ret)
177                         break;
178
179                 if (first) {
180                         /* if this is text, add room for \0 at the end */
181                         cmd_size = sizeof(cmd) + cmd.pdu_len + 1;
182                         assert(!cmdret);
183                 } else
184                         cmd_size += cmd.pdu_len;
185
186                 cmdret = realloc(cmdret, cmd_size);
187
188                 if (first)
189                         memcpy(cmdret, &cmd, sizeof(cmd));
190                 else if (cmdret->opcode != cmd.opcode) {
191                         log_err("fio: fragment opcode mismatch (%d != %d)\n",
192                                         cmdret->opcode, cmd.opcode);
193                         ret = 1;
194                         break;
195                 }
196
197                 if (!cmd.pdu_len)
198                         break;
199
200                 /* There's payload, get it */
201                 pdu = (void *) cmdret->payload + pdu_offset;
202                 ret = fio_recv_data(sk, pdu, cmd.pdu_len);
203                 if (ret)
204                         break;
205
206                 /* Verify payload crc */
207                 crc = fio_crc16(pdu, cmd.pdu_len);
208                 if (crc != cmd.pdu_crc16) {
209                         log_err("fio: server bad crc on payload ");
210                         log_err("(got %x, wanted %x)\n", cmd.pdu_crc16, crc);
211                         ret = 1;
212                         break;
213                 }
214
215                 pdu_offset += cmd.pdu_len;
216                 if (!first)
217                         cmdret->pdu_len += cmd.pdu_len;
218                 first = 0;
219         } while (cmd.flags & FIO_NET_CMD_F_MORE);
220
221         if (ret) {
222                 free(cmdret);
223                 cmdret = NULL;
224         } else if (cmdret) {
225                 /* zero-terminate text input */
226                 if (cmdret->pdu_len) {
227                         if (cmdret->opcode == FIO_NET_CMD_TEXT) {
228                                 struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmdret->payload;
229                                 char *buf = (char *) pdu->buf;
230
231                                 buf[pdu->buf_len ] = '\0';
232                         } else if (cmdret->opcode == FIO_NET_CMD_JOB) {
233                                 char *buf = (char *) cmdret->payload;
234
235                                 buf[cmdret->pdu_len ] = '\0';
236                         }
237                 }
238
239                 /* frag flag is internal */
240                 cmdret->flags &= ~FIO_NET_CMD_F_MORE;
241         }
242
243         return cmdret;
244 }
245
246 void fio_net_cmd_crc(struct fio_net_cmd *cmd)
247 {
248         uint32_t pdu_len;
249
250         cmd->cmd_crc16 = __cpu_to_le16(fio_crc16(cmd, FIO_NET_CMD_CRC_SZ));
251
252         pdu_len = le32_to_cpu(cmd->pdu_len);
253         if (pdu_len)
254                 cmd->pdu_crc16 = __cpu_to_le16(fio_crc16(cmd->payload, pdu_len));
255 }
256
257 int fio_net_send_cmd(int fd, uint16_t opcode, const void *buf, off_t size,
258                      uint64_t tag)
259 {
260         struct fio_net_cmd *cmd = NULL;
261         size_t this_len, cur_len = 0;
262         int ret;
263
264         do {
265                 this_len = size;
266                 if (this_len > FIO_SERVER_MAX_FRAGMENT_PDU)
267                         this_len = FIO_SERVER_MAX_FRAGMENT_PDU;
268
269                 if (!cmd || cur_len < sizeof(*cmd) + this_len) {
270                         if (cmd)
271                                 free(cmd);
272
273                         cur_len = sizeof(*cmd) + this_len;
274                         cmd = malloc(cur_len);
275                 }
276
277                 fio_init_net_cmd(cmd, opcode, buf, this_len, tag);
278
279                 if (this_len < size)
280                         cmd->flags = __cpu_to_le32(FIO_NET_CMD_F_MORE);
281
282                 fio_net_cmd_crc(cmd);
283
284                 ret = fio_send_data(fd, cmd, sizeof(*cmd) + this_len);
285                 size -= this_len;
286                 buf += this_len;
287         } while (!ret && size);
288
289         if (cmd)
290                 free(cmd);
291
292         return ret;
293 }
294
295 static int fio_net_send_simple_stack_cmd(int sk, uint16_t opcode, uint64_t tag)
296 {
297         struct fio_net_cmd cmd;
298
299         fio_init_net_cmd(&cmd, opcode, NULL, 0, tag);
300         fio_net_cmd_crc(&cmd);
301
302         return fio_send_data(sk, &cmd, sizeof(cmd));
303 }
304
305 /*
306  * If 'list' is non-NULL, then allocate and store the sent command for
307  * later verification.
308  */
309 int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t tag,
310                             struct flist_head *list)
311 {
312         struct fio_net_int_cmd *cmd;
313         int ret;
314
315         if (!list)
316                 return fio_net_send_simple_stack_cmd(sk, opcode, tag);
317
318         cmd = malloc(sizeof(*cmd));
319
320         fio_init_net_cmd(&cmd->cmd, opcode, NULL, 0, (uintptr_t) cmd);
321         fio_net_cmd_crc(&cmd->cmd);
322
323         INIT_FLIST_HEAD(&cmd->list);
324         gettimeofday(&cmd->tv, NULL);
325         cmd->saved_tag = tag;
326
327         ret = fio_send_data(sk, &cmd->cmd, sizeof(cmd->cmd));
328         if (ret) {
329                 free(cmd);
330                 return ret;
331         }
332
333         flist_add_tail(&cmd->list, list);
334         return 0;
335 }
336
337 static int fio_server_send_quit_cmd(void)
338 {
339         dprint(FD_NET, "server: sending quit\n");
340         return fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_QUIT, 0, NULL);
341 }
342
343 static int handle_run_cmd(struct fio_net_cmd *cmd)
344 {
345         struct cmd_end_pdu epdu;
346         int ret;
347
348         ret = fio_backend();
349
350         epdu.error = ret;
351         fio_net_send_cmd(server_fd, FIO_NET_CMD_STOP, &epdu, sizeof(epdu), 0);
352
353         fio_server_send_quit_cmd();
354         reset_fio_state();
355         first_cmd_check = 0;
356         return ret;
357 }
358
359 static int handle_job_cmd(struct fio_net_cmd *cmd)
360 {
361         char *buf = (char *) cmd->payload;
362         struct cmd_start_pdu spdu;
363
364         if (parse_jobs_ini(buf, 1, 0)) {
365                 fio_server_send_quit_cmd();
366                 return -1;
367         }
368
369         spdu.jobs = cpu_to_le32(thread_number);
370         fio_net_send_cmd(server_fd, FIO_NET_CMD_START, &spdu, sizeof(spdu), 0);
371         return 0;
372 }
373
374 static int handle_jobline_cmd(struct fio_net_cmd *cmd)
375 {
376         void *pdu = cmd->payload;
377         struct cmd_single_line_pdu *cslp;
378         struct cmd_line_pdu *clp;
379         unsigned long offset;
380         struct cmd_start_pdu spdu;
381         char **argv;
382         int i;
383
384         clp = pdu;
385         clp->lines = le16_to_cpu(clp->lines);
386         argv = malloc(clp->lines * sizeof(char *));
387         offset = sizeof(*clp);
388
389         dprint(FD_NET, "server: %d command line args\n", clp->lines);
390
391         for (i = 0; i < clp->lines; i++) {
392                 cslp = pdu + offset;
393                 argv[i] = (char *) cslp->text;
394
395                 offset += sizeof(*cslp) + le16_to_cpu(cslp->len);
396                 dprint(FD_NET, "server: %d: %s\n", i, argv[i]);
397         }
398
399         if (parse_cmd_line(clp->lines, argv)) {
400                 fio_server_send_quit_cmd();
401                 free(argv);
402                 return -1;
403         }
404
405         free(argv);
406
407         spdu.jobs = cpu_to_le32(thread_number);
408         fio_net_send_cmd(server_fd, FIO_NET_CMD_START, &spdu, sizeof(spdu), 0);
409         return 0;
410 }
411
412 static int handle_probe_cmd(struct fio_net_cmd *cmd)
413 {
414         struct cmd_probe_pdu probe;
415
416         dprint(FD_NET, "server: sending probe reply\n");
417
418         memset(&probe, 0, sizeof(probe));
419         gethostname((char *) probe.hostname, sizeof(probe.hostname));
420 #ifdef FIO_BIG_ENDIAN
421         probe.bigendian = 1;
422 #endif
423         probe.fio_major = FIO_MAJOR;
424         probe.fio_minor = FIO_MINOR;
425         probe.fio_patch = FIO_PATCH;
426
427         probe.os        = FIO_OS;
428         probe.arch      = FIO_ARCH;
429
430         probe.bpp       = sizeof(void *);
431
432         return fio_net_send_cmd(server_fd, FIO_NET_CMD_PROBE, &probe, sizeof(probe), cmd->tag);
433 }
434
435 static int handle_send_eta_cmd(struct fio_net_cmd *cmd)
436 {
437         struct jobs_eta *je;
438         size_t size;
439         int i;
440
441         if (!thread_number)
442                 return 0;
443
444         size = sizeof(*je) + thread_number * sizeof(char) + 1;
445         je = malloc(size);
446         memset(je, 0, size);
447
448         if (!calc_thread_status(je, 1)) {
449                 free(je);
450                 return 0;
451         }
452
453         dprint(FD_NET, "server sending status\n");
454
455         je->nr_running          = cpu_to_le32(je->nr_running);
456         je->nr_ramp             = cpu_to_le32(je->nr_ramp);
457         je->nr_pending          = cpu_to_le32(je->nr_pending);
458         je->files_open          = cpu_to_le32(je->files_open);
459
460         for (i = 0; i < 2; i++) {
461                 je->m_rate[i]   = cpu_to_le32(je->m_rate[i]);
462                 je->t_rate[i]   = cpu_to_le32(je->t_rate[i]);
463                 je->m_iops[i]   = cpu_to_le32(je->m_iops[i]);
464                 je->t_iops[i]   = cpu_to_le32(je->t_iops[i]);
465                 je->rate[i]     = cpu_to_le32(je->rate[i]);
466                 je->iops[i]     = cpu_to_le32(je->iops[i]);
467         }
468
469         je->elapsed_sec         = cpu_to_le64(je->elapsed_sec);
470         je->eta_sec             = cpu_to_le64(je->eta_sec);
471         je->nr_threads          = cpu_to_le32(je->nr_threads);
472
473         fio_net_send_cmd(server_fd, FIO_NET_CMD_ETA, je, size, cmd->tag);
474         free(je);
475         return 0;
476 }
477
478 static int handle_command(struct fio_net_cmd *cmd)
479 {
480         int ret;
481
482         dprint(FD_NET, "server: got op [%s], pdu=%u, tag=%lx\n",
483                         fio_server_op(cmd->opcode), cmd->pdu_len, cmd->tag);
484
485         switch (cmd->opcode) {
486         case FIO_NET_CMD_QUIT:
487                 fio_terminate_threads(TERMINATE_ALL);
488                 return -1;
489         case FIO_NET_CMD_EXIT:
490                 exit_backend = 1;
491                 return -1;
492         case FIO_NET_CMD_JOB:
493                 ret = handle_job_cmd(cmd);
494                 break;
495         case FIO_NET_CMD_JOBLINE:
496                 ret = handle_jobline_cmd(cmd);
497                 break;
498         case FIO_NET_CMD_PROBE:
499                 ret = handle_probe_cmd(cmd);
500                 break;
501         case FIO_NET_CMD_SEND_ETA:
502                 ret = handle_send_eta_cmd(cmd);
503                 break;
504         case FIO_NET_CMD_RUN:
505                 ret = handle_run_cmd(cmd);
506                 break;
507         default:
508                 log_err("fio: unknown opcode: %s\n",fio_server_op(cmd->opcode));
509                 ret = 1;
510         }
511
512         return ret;
513 }
514
515 static int handle_connection(int sk, int block)
516 {
517         struct fio_net_cmd *cmd = NULL;
518         int ret = 0;
519
520         /* read forever */
521         while (!exit_backend) {
522                 struct pollfd pfd = {
523                         .fd     = sk,
524                         .events = POLLIN,
525                 };
526
527                 ret = 0;
528                 do {
529                         ret = poll(&pfd, 1, 100);
530                         if (ret < 0) {
531                                 if (errno == EINTR)
532                                         break;
533                                 log_err("fio: poll: %s\n", strerror(errno));
534                                 break;
535                         } else if (!ret) {
536                                 if (!block)
537                                         return 0;
538                                 continue;
539                         }
540
541                         if (pfd.revents & POLLIN)
542                                 break;
543                         if (pfd.revents & (POLLERR|POLLHUP)) {
544                                 ret = 1;
545                                 break;
546                         }
547                 } while (!exit_backend);
548
549                 if (ret < 0)
550                         break;
551
552                 cmd = fio_net_recv_cmd(sk);
553                 if (!cmd) {
554                         ret = -1;
555                         break;
556                 }
557
558                 ret = handle_command(cmd);
559                 if (ret)
560                         break;
561
562                 free(cmd);
563                 cmd = NULL;
564         }
565
566         if (cmd)
567                 free(cmd);
568
569         return ret;
570 }
571
572 void fio_server_idle_loop(void)
573 {
574         if (!first_cmd_check) {
575                 fio_net_send_simple_cmd(server_fd, FIO_NET_CMD_SERVER_START, 0, NULL);
576                 first_cmd_check = 1;
577         }
578         if (server_fd != -1)
579                 handle_connection(server_fd, 0);
580 }
581
582 static int accept_loop(int listen_sk)
583 {
584         struct sockaddr_in addr;
585         fio_socklen_t len = sizeof(addr);
586         struct pollfd pfd;
587         int ret, sk, flags, exitval = 0;
588
589         dprint(FD_NET, "server enter accept loop\n");
590
591         flags = fcntl(listen_sk, F_GETFL);
592         flags |= O_NONBLOCK;
593         fcntl(listen_sk, F_SETFL, flags);
594 again:
595         pfd.fd = listen_sk;
596         pfd.events = POLLIN;
597         do {
598                 ret = poll(&pfd, 1, 100);
599                 if (ret < 0) {
600                         if (errno == EINTR)
601                                 break;
602                         log_err("fio: poll: %s\n", strerror(errno));
603                         goto out;
604                 } else if (!ret)
605                         continue;
606
607                 if (pfd.revents & POLLIN)
608                         break;
609         } while (!exit_backend);
610
611         if (exit_backend)
612                 goto out;
613
614         sk = accept(listen_sk, (struct sockaddr *) &addr, &len);
615         if (sk < 0) {
616                 log_err("fio: accept: %s\n", strerror(errno));
617                 return -1;
618         }
619
620         dprint(FD_NET, "server: connect from %s\n", inet_ntoa(addr.sin_addr));
621
622         server_fd = sk;
623
624         exitval = handle_connection(sk, 1);
625
626         server_fd = -1;
627         close(sk);
628
629         if (!exit_backend)
630                 goto again;
631
632 out:
633         return exitval;
634 }
635
636 int fio_server_text_output(int level, const char *buf, size_t len)
637 {
638         struct cmd_text_pdu *pdu;
639         unsigned int tlen;
640         struct timeval tv;
641
642         if (server_fd == -1)
643                 return log_local_buf(buf, len);
644
645         tlen = sizeof(*pdu) + len;
646         pdu = malloc(tlen);
647
648         pdu->level      = __cpu_to_le32(level);
649         pdu->buf_len    = __cpu_to_le32(len);
650
651         gettimeofday(&tv, NULL);
652         pdu->log_sec    = __cpu_to_le64(tv.tv_sec);
653         pdu->log_usec   = __cpu_to_le64(tv.tv_usec);
654
655         memcpy(pdu->buf, buf, len);
656
657         fio_net_send_cmd(server_fd, FIO_NET_CMD_TEXT, pdu, tlen, 0);
658         free(pdu);
659         return len;
660 }
661
662 static void convert_io_stat(struct io_stat *dst, struct io_stat *src)
663 {
664         dst->max_val    = cpu_to_le64(src->max_val);
665         dst->min_val    = cpu_to_le64(src->min_val);
666         dst->samples    = cpu_to_le64(src->samples);
667
668         /*
669          * Encode to IEEE 754 for network transfer
670          */
671         dst->mean.u.i   = __cpu_to_le64(fio_double_to_uint64(src->mean.u.f));
672         dst->S.u.i      = __cpu_to_le64(fio_double_to_uint64(src->S.u.f));
673 }
674
675 static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src)
676 {
677         int i;
678
679         for (i = 0; i < 2; i++) {
680                 dst->max_run[i]         = cpu_to_le64(src->max_run[i]);
681                 dst->min_run[i]         = cpu_to_le64(src->min_run[i]);
682                 dst->max_bw[i]          = cpu_to_le64(src->max_bw[i]);
683                 dst->min_bw[i]          = cpu_to_le64(src->min_bw[i]);
684                 dst->io_kb[i]           = cpu_to_le64(src->io_kb[i]);
685                 dst->agg[i]             = cpu_to_le64(src->agg[i]);
686         }
687
688         dst->kb_base    = cpu_to_le32(src->kb_base);
689         dst->groupid    = cpu_to_le32(src->groupid);
690 }
691
692 /*
693  * Send a CMD_TS, which packs struct thread_stat and group_run_stats
694  * into a single payload.
695  */
696 void fio_server_send_ts(struct thread_stat *ts, struct group_run_stats *rs)
697 {
698         struct cmd_ts_pdu p;
699         int i, j;
700
701         dprint(FD_NET, "server sending end stats\n");
702
703         memset(&p, 0, sizeof(p));
704
705         strcpy(p.ts.name, ts->name);
706         strcpy(p.ts.verror, ts->verror);
707         strcpy(p.ts.description, ts->description);
708
709         p.ts.error      = cpu_to_le32(ts->error);
710         p.ts.groupid    = cpu_to_le32(ts->groupid);
711         p.ts.pid        = cpu_to_le32(ts->pid);
712         p.ts.members    = cpu_to_le32(ts->members);
713
714         for (i = 0; i < 2; i++) {
715                 convert_io_stat(&p.ts.clat_stat[i], &ts->clat_stat[i]);
716                 convert_io_stat(&p.ts.slat_stat[i], &ts->slat_stat[i]);
717                 convert_io_stat(&p.ts.lat_stat[i], &ts->lat_stat[i]);
718                 convert_io_stat(&p.ts.bw_stat[i], &ts->bw_stat[i]);
719         }
720
721         p.ts.usr_time           = cpu_to_le64(ts->usr_time);
722         p.ts.sys_time           = cpu_to_le64(ts->sys_time);
723         p.ts.ctx                = cpu_to_le64(ts->ctx);
724         p.ts.minf               = cpu_to_le64(ts->minf);
725         p.ts.majf               = cpu_to_le64(ts->majf);
726         p.ts.clat_percentiles   = cpu_to_le64(ts->clat_percentiles);
727
728         for (i = 0; i < FIO_IO_U_LIST_MAX_LEN; i++) {
729                 fio_fp64_t *src = &ts->percentile_list[i];
730                 fio_fp64_t *dst = &p.ts.percentile_list[i];
731
732                 dst->u.i = __cpu_to_le64(fio_double_to_uint64(src->u.f));
733         }
734
735         for (i = 0; i < FIO_IO_U_MAP_NR; i++) {
736                 p.ts.io_u_map[i]        = cpu_to_le32(ts->io_u_map[i]);
737                 p.ts.io_u_submit[i]     = cpu_to_le32(ts->io_u_submit[i]);
738                 p.ts.io_u_complete[i]   = cpu_to_le32(ts->io_u_complete[i]);
739         }
740
741         for (i = 0; i < FIO_IO_U_LAT_U_NR; i++) {
742                 p.ts.io_u_lat_u[i]      = cpu_to_le32(ts->io_u_lat_u[i]);
743                 p.ts.io_u_lat_m[i]      = cpu_to_le32(ts->io_u_lat_m[i]);
744         }
745
746         for (i = 0; i < 2; i++)
747                 for (j = 0; j < FIO_IO_U_PLAT_NR; j++)
748                         p.ts.io_u_plat[i][j] = cpu_to_le32(ts->io_u_plat[i][j]);
749
750         for (i = 0; i < 3; i++) {
751                 p.ts.total_io_u[i]      = cpu_to_le64(ts->total_io_u[i]);
752                 p.ts.short_io_u[i]      = cpu_to_le64(ts->short_io_u[i]);
753         }
754
755         p.ts.total_submit       = cpu_to_le64(ts->total_submit);
756         p.ts.total_complete     = cpu_to_le64(ts->total_complete);
757
758         for (i = 0; i < 2; i++) {
759                 p.ts.io_bytes[i]        = cpu_to_le64(ts->io_bytes[i]);
760                 p.ts.runtime[i]         = cpu_to_le64(ts->runtime[i]);
761         }
762
763         p.ts.total_run_time     = cpu_to_le64(ts->total_run_time);
764         p.ts.continue_on_error  = cpu_to_le16(ts->continue_on_error);
765         p.ts.total_err_count    = cpu_to_le64(ts->total_err_count);
766         p.ts.first_error        = cpu_to_le32(ts->first_error);
767         p.ts.kb_base            = cpu_to_le32(ts->kb_base);
768
769         convert_gs(&p.rs, rs);
770
771         fio_net_send_cmd(server_fd, FIO_NET_CMD_TS, &p, sizeof(p), 0);
772 }
773
774 void fio_server_send_gs(struct group_run_stats *rs)
775 {
776         struct group_run_stats gs;
777
778         dprint(FD_NET, "server sending group run stats\n");
779
780         convert_gs(&gs, rs);
781         fio_net_send_cmd(server_fd, FIO_NET_CMD_GS, &gs, sizeof(gs), 0);
782 }
783
784 static void convert_agg(struct disk_util_agg *dst, struct disk_util_agg *src)
785 {
786         int i;
787
788         for (i = 0; i < 2; i++) {
789                 dst->ios[i]     = cpu_to_le32(src->ios[i]);
790                 dst->merges[i]  = cpu_to_le32(src->merges[i]);
791                 dst->sectors[i] = cpu_to_le64(src->sectors[i]);
792                 dst->ticks[i]   = cpu_to_le32(src->ticks[i]);
793         }
794
795         dst->io_ticks           = cpu_to_le32(src->io_ticks);
796         dst->time_in_queue      = cpu_to_le32(src->time_in_queue);
797         dst->slavecount         = cpu_to_le32(src->slavecount);
798         dst->max_util.u.i       = __cpu_to_le64(fio_double_to_uint64(src->max_util.u.f));
799 }
800
801 static void convert_dus(struct disk_util_stat *dst, struct disk_util_stat *src)
802 {
803         int i;
804
805         strcpy((char *) dst->name, (char *) src->name);
806
807         for (i = 0; i < 2; i++) {
808                 dst->ios[i]     = cpu_to_le32(src->ios[i]);
809                 dst->merges[i]  = cpu_to_le32(src->merges[i]);
810                 dst->sectors[i] = cpu_to_le64(src->sectors[i]);
811                 dst->ticks[i]   = cpu_to_le32(src->ticks[i]);
812         }
813
814         dst->io_ticks           = cpu_to_le32(src->io_ticks);
815         dst->time_in_queue      = cpu_to_le32(src->time_in_queue);
816         dst->msec               = cpu_to_le64(src->msec);
817 }
818
819 void fio_server_send_du(void)
820 {
821         struct disk_util *du;
822         struct flist_head *entry;
823         struct cmd_du_pdu pdu;
824
825         dprint(FD_NET, "server: sending disk_util %d\n", !flist_empty(&disk_list));
826
827         memset(&pdu, 0, sizeof(pdu));
828
829         flist_for_each(entry, &disk_list) {
830                 du = flist_entry(entry, struct disk_util, list);
831
832                 convert_dus(&pdu.dus, &du->dus);
833                 convert_agg(&pdu.agg, &du->agg);
834
835                 fio_net_send_cmd(server_fd, FIO_NET_CMD_DU, &pdu, sizeof(pdu), 0);
836         }
837 }
838
839 void fio_server_send_add_job(struct thread_options *o, const char *ioengine)
840 {
841         struct cmd_add_job_pdu pdu;
842
843         convert_thread_options_to_net(&pdu.top, o);
844
845         fio_net_send_cmd(server_fd, FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), 0);
846 }
847
848 static int fio_init_server_ip(void)
849 {
850         struct sockaddr *addr;
851         fio_socklen_t socklen;
852         int sk, opt;
853
854         if (use_ipv6)
855                 sk = socket(AF_INET6, SOCK_STREAM, 0);
856         else
857                 sk = socket(AF_INET, SOCK_STREAM, 0);
858
859         if (sk < 0) {
860                 log_err("fio: socket: %s\n", strerror(errno));
861                 return -1;
862         }
863
864         opt = 1;
865         if (setsockopt(sk, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
866                 log_err("fio: setsockopt: %s\n", strerror(errno));
867                 close(sk);
868                 return -1;
869         }
870 #ifdef SO_REUSEPORT
871         if (setsockopt(sk, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
872                 log_err("fio: setsockopt: %s\n", strerror(errno));
873                 close(sk);
874                 return -1;
875         }
876 #endif
877
878         if (use_ipv6) {
879                 addr = (struct sockaddr *) &saddr_in6;
880                 socklen = sizeof(saddr_in6);
881                 saddr_in6.sin6_family = AF_INET6;
882         } else {
883                 addr = (struct sockaddr *) &saddr_in;
884                 socklen = sizeof(saddr_in);
885                 saddr_in.sin_family = AF_INET;
886         }
887
888         if (bind(sk, addr, socklen) < 0) {
889                 log_err("fio: bind: %s\n", strerror(errno));
890                 close(sk);
891                 return -1;
892         }
893
894         return sk;
895 }
896
897 static int fio_init_server_sock(void)
898 {
899         struct sockaddr_un addr;
900         fio_socklen_t len;
901         mode_t mode;
902         int sk;
903
904         sk = socket(AF_UNIX, SOCK_STREAM, 0);
905         if (sk < 0) {
906                 log_err("fio: socket: %s\n", strerror(errno));
907                 return -1;
908         }
909
910         mode = umask(000);
911
912         memset(&addr, 0, sizeof(addr));
913         addr.sun_family = AF_UNIX;
914         strcpy(addr.sun_path, bind_sock);
915         unlink(bind_sock);
916
917         len = sizeof(addr.sun_family) + strlen(bind_sock) + 1;
918
919         if (bind(sk, (struct sockaddr *) &addr, len) < 0) {
920                 log_err("fio: bind: %s\n", strerror(errno));
921                 close(sk);
922                 return -1;
923         }
924
925         umask(mode);
926         return sk;
927 }
928
929 static int fio_init_server_connection(void)
930 {
931         char bind_str[128];
932         int sk;
933
934         dprint(FD_NET, "starting server\n");
935
936         if (!bind_sock)
937                 sk = fio_init_server_ip();
938         else
939                 sk = fio_init_server_sock();
940
941         if (sk < 0)
942                 return sk;
943
944         if (!bind_sock) {
945                 char *p, port[16];
946                 const void *src;
947                 int af;
948
949                 if (use_ipv6) {
950                         af = AF_INET6;
951                         src = &saddr_in6.sin6_addr;
952                 } else {
953                         af = AF_INET;
954                         src = &saddr_in.sin_addr;
955                 }
956
957                 p = (char *) inet_ntop(af, src, bind_str, sizeof(bind_str));
958
959                 sprintf(port, ",%u", fio_net_port);
960                 if (p)
961                         strcat(p, port);
962                 else
963                         strcpy(bind_str, port);
964         } else
965                 strcpy(bind_str, bind_sock);
966
967         log_info("fio: server listening on %s\n", bind_str);
968
969         if (listen(sk, 0) < 0) {
970                 log_err("fio: listen: %s\n", strerror(errno));
971                 return -1;
972         }
973
974         return sk;
975 }
976
977 int fio_server_parse_host(const char *host, int *ipv6, struct in_addr *inp,
978                           struct in6_addr *inp6)
979
980 {
981         int ret = 0;
982
983         if (*ipv6)
984                 ret = inet_pton(AF_INET6, host, inp6);
985         else
986                 ret = inet_pton(AF_INET, host, inp);
987
988         if (ret != 1) {
989                 struct hostent *hent;
990
991                 hent = gethostbyname(host);
992                 if (!hent) {
993                         log_err("fio: failed to resolve <%s>\n", host);
994                         return 0;
995                 }
996
997                 if (*ipv6) {
998                         if (hent->h_addrtype != AF_INET6) {
999                                 log_info("fio: falling back to IPv4\n");
1000                                 *ipv6 = 0;
1001                         } else
1002                                 memcpy(inp6, hent->h_addr_list[0], 16);
1003                 }
1004                 if (!*ipv6) {
1005                         if (hent->h_addrtype != AF_INET) {
1006                                 log_err("fio: lookup type mismatch\n");
1007                                 return 0;
1008                         }
1009                         memcpy(inp, hent->h_addr_list[0], 4);
1010                 }
1011                 ret = 1;
1012         }
1013
1014         return !(ret == 1);
1015 }
1016
1017 /*
1018  * Parse a host/ip/port string. Reads from 'str'.
1019  *
1020  * Outputs:
1021  *
1022  * For IPv4:
1023  *      *ptr is the host, *port is the port, inp is the destination.
1024  * For IPv6:
1025  *      *ptr is the host, *port is the port, inp6 is the dest, and *ipv6 is 1.
1026  * For local domain sockets:
1027  *      *ptr is the filename, *is_sock is 1.
1028  */
1029 int fio_server_parse_string(const char *str, char **ptr, int *is_sock,
1030                             int *port, struct in_addr *inp,
1031                             struct in6_addr *inp6, int *ipv6)
1032 {
1033         const char *host = str;
1034         char *portp;
1035         int lport = 0;
1036
1037         *ptr = NULL;
1038         *is_sock = 0;
1039         *port = fio_net_port;
1040         *ipv6 = 0;
1041
1042         if (!strncmp(str, "sock:", 5)) {
1043                 *ptr = strdup(str + 5);
1044                 *is_sock = 1;
1045
1046                 return 0;
1047         }
1048
1049         /*
1050          * Is it ip:<ip or host>:port
1051          */
1052         if (!strncmp(host, "ip:", 3))
1053                 host += 3;
1054         else if (!strncmp(host, "ip4:", 4))
1055                 host += 4;
1056         else if (!strncmp(host, "ip6:", 4)) {
1057                 host += 4;
1058                 *ipv6 = 1;
1059         } else if (host[0] == ':') {
1060                 /* String is :port */
1061                 host++;
1062                 lport = atoi(host);
1063                 if (!lport || lport > 65535) {
1064                         log_err("fio: bad server port %u\n", port);
1065                         return 1;
1066                 }
1067                 /* no hostname given, we are done */
1068                 *port = lport;
1069                 return 0;
1070         }
1071
1072         /*
1073          * If no port seen yet, check if there's a last ':' at the end
1074          */
1075         if (!lport) {
1076                 portp = strchr(host, ',');
1077                 if (portp) {
1078                         *portp = '\0';
1079                         portp++;
1080                         lport = atoi(portp);
1081                         if (!lport || lport > 65535) {
1082                                 log_err("fio: bad server port %u\n", port);
1083                                 return 1;
1084                         }
1085                 }
1086         }
1087
1088         if (lport)
1089                 *port = lport;
1090
1091         if (!strlen(host))
1092                 return 0;
1093
1094         *ptr = strdup(host);
1095
1096         if (fio_server_parse_host(*ptr, ipv6, inp, inp6)) {
1097                 free(*ptr);
1098                 *ptr = NULL;
1099                 return 1;
1100         }
1101
1102         if (*port == 0)
1103                 *port = fio_net_port;
1104
1105         return 0;
1106 }
1107
1108 /*
1109  * Server arg should be one of:
1110  *
1111  * sock:/path/to/socket
1112  *   ip:1.2.3.4
1113  *      1.2.3.4
1114  *
1115  * Where sock uses unix domain sockets, and ip binds the server to
1116  * a specific interface. If no arguments are given to the server, it
1117  * uses IP and binds to 0.0.0.0.
1118  *
1119  */
1120 static int fio_handle_server_arg(void)
1121 {
1122         int port = fio_net_port;
1123         int is_sock, ret = 0;
1124
1125         saddr_in.sin_addr.s_addr = htonl(INADDR_ANY);
1126
1127         if (!fio_server_arg)
1128                 goto out;
1129
1130         ret = fio_server_parse_string(fio_server_arg, &bind_sock, &is_sock,
1131                                         &port, &saddr_in.sin_addr,
1132                                         &saddr_in6.sin6_addr, &use_ipv6);
1133
1134         if (!is_sock && bind_sock) {
1135                 free(bind_sock);
1136                 bind_sock = NULL;
1137         }
1138
1139 out:
1140         fio_net_port = port;
1141         saddr_in.sin_port = htons(port);
1142         saddr_in6.sin6_port = htons(port);
1143         return ret;
1144 }
1145
1146 static int fio_server(void)
1147 {
1148         int sk, ret;
1149
1150         dprint(FD_NET, "starting server\n");
1151
1152         if (fio_handle_server_arg())
1153                 return -1;
1154
1155         sk = fio_init_server_connection();
1156         if (sk < 0)
1157                 return -1;
1158
1159         ret = accept_loop(sk);
1160
1161         close(sk);
1162
1163         if (fio_server_arg) {
1164                 free(fio_server_arg);
1165                 fio_server_arg = NULL;
1166         }
1167         if (bind_sock)
1168                 free(bind_sock);
1169
1170         return ret;
1171 }
1172
1173 void fio_server_got_signal(int signal)
1174 {
1175         if (signal == SIGPIPE)
1176                 server_fd = -1;
1177         else {
1178                 log_info("\nfio: terminating on signal %d\n", signal);
1179                 exit_backend = 1;
1180         }
1181 }
1182
1183 static int check_existing_pidfile(const char *pidfile)
1184 {
1185         struct stat sb;
1186         char buf[16];
1187         pid_t pid;
1188         FILE *f;
1189
1190         if (stat(pidfile, &sb))
1191                 return 0;
1192
1193         f = fopen(pidfile, "r");
1194         if (!f)
1195                 return 0;
1196
1197         if (fread(buf, sb.st_size, 1, f) <= 0) {
1198                 fclose(f);
1199                 return 1;
1200         }
1201         fclose(f);
1202
1203         pid = atoi(buf);
1204         if (kill(pid, SIGCONT) < 0)
1205                 return errno != ESRCH;
1206
1207         return 1;
1208 }
1209
1210 static int write_pid(pid_t pid, const char *pidfile)
1211 {
1212         FILE *fpid;
1213
1214         fpid = fopen(pidfile, "w");
1215         if (!fpid) {
1216                 log_err("fio: failed opening pid file %s\n", pidfile);
1217                 return 1;
1218         }
1219
1220         fprintf(fpid, "%u\n", (unsigned int) pid);
1221         fclose(fpid);
1222         return 0;
1223 }
1224
1225 /*
1226  * If pidfile is specified, background us.
1227  */
1228 int fio_start_server(char *pidfile)
1229 {
1230         pid_t pid;
1231         int ret;
1232
1233 #if defined(WIN32)
1234     WSADATA wsd;
1235     WSAStartup(MAKEWORD(2,2), &wsd);
1236 #endif
1237
1238         if (!pidfile)
1239                 return fio_server();
1240
1241         if (check_existing_pidfile(pidfile)) {
1242                 log_err("fio: pidfile %s exists and server appears alive\n",
1243                                                                 pidfile);
1244                 return -1;
1245         }
1246
1247         pid = fork();
1248         if (pid < 0) {
1249                 log_err("fio: failed server fork: %s", strerror(errno));
1250                 free(pidfile);
1251                 return -1;
1252         } else if (pid) {
1253                 int ret = write_pid(pid, pidfile);
1254
1255                 exit(ret);
1256         }
1257
1258         setsid();
1259         openlog("fio", LOG_NDELAY|LOG_NOWAIT|LOG_PID, LOG_USER);
1260         log_syslog = 1;
1261         close(STDIN_FILENO);
1262         close(STDOUT_FILENO);
1263         close(STDERR_FILENO);
1264         f_out = NULL;
1265         f_err = NULL;
1266
1267         ret = fio_server();
1268
1269         closelog();
1270         unlink(pidfile);
1271         free(pidfile);
1272         return ret;
1273 }
1274
1275 void fio_server_set_arg(const char *arg)
1276 {
1277         fio_server_arg = strdup(arg);
1278 }