server: cleanup and proper error returns
[fio.git] / server.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <stdarg.h>
4 #include <unistd.h>
5 #include <limits.h>
6 #include <errno.h>
7 #include <sys/poll.h>
8 #include <sys/types.h>
9 #include <sys/wait.h>
10 #include <sys/socket.h>
11 #include <sys/stat.h>
12 #include <sys/un.h>
13 #include <sys/uio.h>
14 #include <netinet/in.h>
15 #include <arpa/inet.h>
16 #include <netdb.h>
17 #include <syslog.h>
18 #include <signal.h>
19 #ifdef CONFIG_ZLIB
20 #include <zlib.h>
21 #endif
22
23 #include "fio.h"
24 #include "options.h"
25 #include "server.h"
26 #include "crc/crc16.h"
27 #include "lib/ieee754.h"
28 #include "verify.h"
29 #include "smalloc.h"
30
31 int fio_net_port = FIO_NET_PORT;
32
33 int exit_backend = 0;
34
35 enum {
36         SK_F_FREE       = 1,
37         SK_F_COPY       = 2,
38         SK_F_SIMPLE     = 4,
39         SK_F_VEC        = 8,
40 };
41
42 struct sk_entry {
43         struct flist_head list;
44         int opcode;
45         void *buf;
46         off_t size;
47         uint64_t *tagptr;
48         int flags;
49         struct flist_head next;
50 };
51
52 struct sk_out {
53         int sk;
54         struct fio_mutex *lock;
55         struct flist_head list;
56         struct fio_mutex *wait;
57 };
58
59 static char *fio_server_arg;
60 static char *bind_sock;
61 static struct sockaddr_in saddr_in;
62 static struct sockaddr_in6 saddr_in6;
63 static int use_ipv6;
64 #ifdef CONFIG_ZLIB
65 static unsigned int has_zlib = 1;
66 #else
67 static unsigned int has_zlib = 0;
68 #endif
69 static unsigned int use_zlib;
70 static char me[128];
71
72 static pthread_key_t sk_out_key;
73
74 struct fio_fork_item {
75         struct flist_head list;
76         int exitval;
77         int signal;
78         int exited;
79         pid_t pid;
80 };
81
82 struct cmd_reply {
83         struct fio_mutex lock;
84         void *data;
85         size_t size;
86         int error;
87 };
88
89 static const char *fio_server_ops[FIO_NET_CMD_NR] = {
90         "",
91         "QUIT",
92         "EXIT",
93         "JOB",
94         "JOBLINE",
95         "TEXT",
96         "TS",
97         "GS",
98         "SEND_ETA",
99         "ETA",
100         "PROBE",
101         "START",
102         "STOP",
103         "DISK_UTIL",
104         "SERVER_START",
105         "ADD_JOB",
106         "RUN",
107         "IOLOG",
108         "UPDATE_JOB",
109         "LOAD_FILE",
110         "VTRIGGER",
111         "SENDFILE",
112 };
113
114 static void sk_lock(struct sk_out *sk_out)
115 {
116         fio_mutex_down(sk_out->lock);
117 }
118
119 static void sk_unlock(struct sk_out *sk_out)
120 {
121         fio_mutex_up(sk_out->lock);
122 }
123
124 const char *fio_server_op(unsigned int op)
125 {
126         static char buf[32];
127
128         if (op < FIO_NET_CMD_NR)
129                 return fio_server_ops[op];
130
131         sprintf(buf, "UNKNOWN/%d", op);
132         return buf;
133 }
134
135 static ssize_t iov_total_len(const struct iovec *iov, int count)
136 {
137         ssize_t ret = 0;
138
139         while (count--) {
140                 ret += iov->iov_len;
141                 iov++;
142         }
143
144         return ret;
145 }
146
147 static int fio_sendv_data(int sk, struct iovec *iov, int count)
148 {
149         ssize_t total_len = iov_total_len(iov, count);
150         ssize_t ret;
151
152         do {
153                 ret = writev(sk, iov, count);
154                 if (ret > 0) {
155                         total_len -= ret;
156                         if (!total_len)
157                                 break;
158
159                         while (ret) {
160                                 if (ret >= iov->iov_len) {
161                                         ret -= iov->iov_len;
162                                         iov++;
163                                         continue;
164                                 }
165                                 iov->iov_base += ret;
166                                 iov->iov_len -= ret;
167                                 ret = 0;
168                         }
169                 } else if (!ret)
170                         break;
171                 else if (errno == EAGAIN || errno == EINTR)
172                         continue;
173                 else
174                         break;
175         } while (!exit_backend);
176
177         if (!total_len)
178                 return 0;
179
180         return 1;
181 }
182
183 int fio_send_data(int sk, const void *p, unsigned int len)
184 {
185         struct iovec iov = { .iov_base = (void *) p, .iov_len = len };
186
187         assert(len <= sizeof(struct fio_net_cmd) + FIO_SERVER_MAX_FRAGMENT_PDU);
188
189         return fio_sendv_data(sk, &iov, 1);
190 }
191
192 int fio_recv_data(int sk, void *p, unsigned int len)
193 {
194         do {
195                 int ret = recv(sk, p, len, MSG_WAITALL);
196
197                 if (ret > 0) {
198                         len -= ret;
199                         if (!len)
200                                 break;
201                         p += ret;
202                         continue;
203                 } else if (!ret)
204                         break;
205                 else if (errno == EAGAIN || errno == EINTR)
206                         continue;
207                 else
208                         break;
209         } while (!exit_backend);
210
211         if (!len)
212                 return 0;
213
214         return -1;
215 }
216
217 static int verify_convert_cmd(struct fio_net_cmd *cmd)
218 {
219         uint16_t crc;
220
221         cmd->cmd_crc16 = le16_to_cpu(cmd->cmd_crc16);
222         cmd->pdu_crc16 = le16_to_cpu(cmd->pdu_crc16);
223
224         crc = fio_crc16(cmd, FIO_NET_CMD_CRC_SZ);
225         if (crc != cmd->cmd_crc16) {
226                 log_err("fio: server bad crc on command (got %x, wanted %x)\n",
227                                 cmd->cmd_crc16, crc);
228                 return 1;
229         }
230
231         cmd->version    = le16_to_cpu(cmd->version);
232         cmd->opcode     = le16_to_cpu(cmd->opcode);
233         cmd->flags      = le32_to_cpu(cmd->flags);
234         cmd->tag        = le64_to_cpu(cmd->tag);
235         cmd->pdu_len    = le32_to_cpu(cmd->pdu_len);
236
237         switch (cmd->version) {
238         case FIO_SERVER_VER:
239                 break;
240         default:
241                 log_err("fio: bad server cmd version %d\n", cmd->version);
242                 return 1;
243         }
244
245         if (cmd->pdu_len > FIO_SERVER_MAX_FRAGMENT_PDU) {
246                 log_err("fio: command payload too large: %u\n", cmd->pdu_len);
247                 return 1;
248         }
249
250         return 0;
251 }
252
253 /*
254  * Read (and defragment, if necessary) incoming commands
255  */
256 struct fio_net_cmd *fio_net_recv_cmd(int sk)
257 {
258         struct fio_net_cmd cmd, *tmp, *cmdret = NULL;
259         size_t cmd_size = 0, pdu_offset = 0;
260         uint16_t crc;
261         int ret, first = 1;
262         void *pdu = NULL;
263
264         do {
265                 ret = fio_recv_data(sk, &cmd, sizeof(cmd));
266                 if (ret)
267                         break;
268
269                 /* We have a command, verify it and swap if need be */
270                 ret = verify_convert_cmd(&cmd);
271                 if (ret)
272                         break;
273
274                 if (first) {
275                         /* if this is text, add room for \0 at the end */
276                         cmd_size = sizeof(cmd) + cmd.pdu_len + 1;
277                         assert(!cmdret);
278                 } else
279                         cmd_size += cmd.pdu_len;
280
281                 if (cmd_size / 1024 > FIO_SERVER_MAX_CMD_MB * 1024) {
282                         log_err("fio: cmd+pdu too large (%llu)\n", (unsigned long long) cmd_size);
283                         ret = 1;
284                         break;
285                 }
286
287                 tmp = realloc(cmdret, cmd_size);
288                 if (!tmp) {
289                         log_err("fio: server failed allocating cmd\n");
290                         ret = 1;
291                         break;
292                 }
293                 cmdret = tmp;
294
295                 if (first)
296                         memcpy(cmdret, &cmd, sizeof(cmd));
297                 else if (cmdret->opcode != cmd.opcode) {
298                         log_err("fio: fragment opcode mismatch (%d != %d)\n",
299                                         cmdret->opcode, cmd.opcode);
300                         ret = 1;
301                         break;
302                 }
303
304                 if (!cmd.pdu_len)
305                         break;
306
307                 /* There's payload, get it */
308                 pdu = (void *) cmdret->payload + pdu_offset;
309                 ret = fio_recv_data(sk, pdu, cmd.pdu_len);
310                 if (ret)
311                         break;
312
313                 /* Verify payload crc */
314                 crc = fio_crc16(pdu, cmd.pdu_len);
315                 if (crc != cmd.pdu_crc16) {
316                         log_err("fio: server bad crc on payload ");
317                         log_err("(got %x, wanted %x)\n", cmd.pdu_crc16, crc);
318                         ret = 1;
319                         break;
320                 }
321
322                 pdu_offset += cmd.pdu_len;
323                 if (!first)
324                         cmdret->pdu_len += cmd.pdu_len;
325                 first = 0;
326         } while (cmd.flags & FIO_NET_CMD_F_MORE);
327
328         if (ret) {
329                 free(cmdret);
330                 cmdret = NULL;
331         } else if (cmdret) {
332                 /* zero-terminate text input */
333                 if (cmdret->pdu_len) {
334                         if (cmdret->opcode == FIO_NET_CMD_TEXT) {
335                                 struct cmd_text_pdu *__pdu = (struct cmd_text_pdu *) cmdret->payload;
336                                 char *buf = (char *) __pdu->buf;
337
338                                 buf[__pdu->buf_len] = '\0';
339                         } else if (cmdret->opcode == FIO_NET_CMD_JOB) {
340                                 struct cmd_job_pdu *__pdu = (struct cmd_job_pdu *) cmdret->payload;
341                                 char *buf = (char *) __pdu->buf;
342                                 int len = le32_to_cpu(__pdu->buf_len);
343
344                                 buf[len] = '\0';
345                         }
346                 }
347
348                 /* frag flag is internal */
349                 cmdret->flags &= ~FIO_NET_CMD_F_MORE;
350         }
351
352         return cmdret;
353 }
354
355 static void add_reply(uint64_t tag, struct flist_head *list)
356 {
357         struct fio_net_cmd_reply *reply;
358
359         reply = (struct fio_net_cmd_reply *) (uintptr_t) tag;
360         flist_add_tail(&reply->list, list);
361 }
362
363 static uint64_t alloc_reply(uint64_t tag, uint16_t opcode)
364 {
365         struct fio_net_cmd_reply *reply;
366
367         reply = calloc(1, sizeof(*reply));
368         INIT_FLIST_HEAD(&reply->list);
369         fio_gettime(&reply->tv, NULL);
370         reply->saved_tag = tag;
371         reply->opcode = opcode;
372
373         return (uintptr_t) reply;
374 }
375
376 static void free_reply(uint64_t tag)
377 {
378         struct fio_net_cmd_reply *reply;
379
380         reply = (struct fio_net_cmd_reply *) (uintptr_t) tag;
381         free(reply);
382 }
383
384 void fio_net_cmd_crc_pdu(struct fio_net_cmd *cmd, const void *pdu)
385 {
386         uint32_t pdu_len;
387
388         cmd->cmd_crc16 = __cpu_to_le16(fio_crc16(cmd, FIO_NET_CMD_CRC_SZ));
389
390         pdu_len = le32_to_cpu(cmd->pdu_len);
391         cmd->pdu_crc16 = __cpu_to_le16(fio_crc16(pdu, pdu_len));
392 }
393
394 void fio_net_cmd_crc(struct fio_net_cmd *cmd)
395 {
396         fio_net_cmd_crc_pdu(cmd, cmd->payload);
397 }
398
399 int fio_net_send_cmd(int fd, uint16_t opcode, const void *buf, off_t size,
400                      uint64_t *tagptr, struct flist_head *list)
401 {
402         struct fio_net_cmd *cmd = NULL;
403         size_t this_len, cur_len = 0;
404         uint64_t tag;
405         int ret;
406
407         if (list) {
408                 assert(tagptr);
409                 tag = *tagptr = alloc_reply(*tagptr, opcode);
410         } else
411                 tag = tagptr ? *tagptr : 0;
412
413         do {
414                 this_len = size;
415                 if (this_len > FIO_SERVER_MAX_FRAGMENT_PDU)
416                         this_len = FIO_SERVER_MAX_FRAGMENT_PDU;
417
418                 if (!cmd || cur_len < sizeof(*cmd) + this_len) {
419                         if (cmd)
420                                 free(cmd);
421
422                         cur_len = sizeof(*cmd) + this_len;
423                         cmd = malloc(cur_len);
424                 }
425
426                 fio_init_net_cmd(cmd, opcode, buf, this_len, tag);
427
428                 if (this_len < size)
429                         cmd->flags = __cpu_to_le32(FIO_NET_CMD_F_MORE);
430
431                 fio_net_cmd_crc(cmd);
432
433                 ret = fio_send_data(fd, cmd, sizeof(*cmd) + this_len);
434                 size -= this_len;
435                 buf += this_len;
436         } while (!ret && size);
437
438         if (list) {
439                 if (ret)
440                         free_reply(tag);
441                 else
442                         add_reply(tag, list);
443         }
444
445         if (cmd)
446                 free(cmd);
447
448         return ret;
449 }
450
451 struct sk_entry *fio_net_prep_cmd(uint16_t opcode, void *buf, off_t size,
452                                   uint64_t *tagptr, int flags)
453 {
454         struct sk_entry *entry;
455
456         entry = smalloc(sizeof(*entry));
457         INIT_FLIST_HEAD(&entry->next);
458         entry->opcode = opcode;
459         if (flags & SK_F_COPY) {
460                 entry->buf = smalloc(size);
461                 memcpy(entry->buf, buf, size);
462         } else
463                 entry->buf = buf;
464         entry->size = size;
465         entry->tagptr = tagptr;
466         entry->flags = flags;
467
468         return entry;
469 }
470
471 static void fio_net_queue_entry(struct sk_entry *entry)
472 {
473         struct sk_out *sk_out = pthread_getspecific(sk_out_key);
474
475         sk_lock(sk_out);
476         flist_add_tail(&entry->list, &sk_out->list);
477         sk_unlock(sk_out);
478
479         fio_mutex_up(sk_out->wait);
480 }
481
482 static int fio_net_queue_cmd(uint16_t opcode, void *buf, off_t size,
483                              uint64_t *tagptr, int flags)
484 {
485         struct sk_entry *entry;
486
487         entry = fio_net_prep_cmd(opcode, buf, size, tagptr, flags);
488         fio_net_queue_entry(entry);
489         return 0;
490 }
491
492 static int fio_net_send_simple_stack_cmd(int sk, uint16_t opcode, uint64_t tag)
493 {
494         struct fio_net_cmd cmd;
495
496         fio_init_net_cmd(&cmd, opcode, NULL, 0, tag);
497         fio_net_cmd_crc(&cmd);
498
499         return fio_send_data(sk, &cmd, sizeof(cmd));
500 }
501
502 /*
503  * If 'list' is non-NULL, then allocate and store the sent command for
504  * later verification.
505  */
506 int fio_net_send_simple_cmd(int sk, uint16_t opcode, uint64_t tag,
507                             struct flist_head *list)
508 {
509         int ret;
510
511         if (list)
512                 tag = alloc_reply(tag, opcode);
513
514         ret = fio_net_send_simple_stack_cmd(sk, opcode, tag);
515         if (ret) {
516                 if (list)
517                         free_reply(tag);
518
519                 return ret;
520         }
521
522         if (list)
523                 add_reply(tag, list);
524
525         return 0;
526 }
527
528 static int fio_net_queue_quit(void)
529 {
530         dprint(FD_NET, "server: sending quit\n");
531
532         return fio_net_queue_cmd(FIO_NET_CMD_QUIT, NULL, 0, 0, SK_F_SIMPLE);
533 }
534
535 int fio_net_send_quit(int sk)
536 {
537         dprint(FD_NET, "server: sending quit\n");
538
539         return fio_net_send_simple_cmd(sk, FIO_NET_CMD_QUIT, 0, NULL);
540 }
541
542 static int fio_net_send_ack(struct fio_net_cmd *cmd, int error, int signal)
543 {
544         struct cmd_end_pdu epdu;
545         uint64_t tag = 0;
546
547         if (cmd)
548                 tag = cmd->tag;
549
550         epdu.error = __cpu_to_le32(error);
551         epdu.signal = __cpu_to_le32(signal);
552         return fio_net_queue_cmd(FIO_NET_CMD_STOP, &epdu, sizeof(epdu), &tag, SK_F_COPY);
553 }
554
555 static int fio_net_queue_stop(int error, int signal)
556 {
557         dprint(FD_NET, "server: sending stop (%d, %d)\n", error, signal);
558         return fio_net_send_ack(NULL, error, signal);
559 }
560
561 static void fio_server_add_fork_item(pid_t pid, struct flist_head *list)
562 {
563         struct fio_fork_item *ffi;
564
565         ffi = malloc(sizeof(*ffi));
566         ffi->exitval = 0;
567         ffi->signal = 0;
568         ffi->exited = 0;
569         ffi->pid = pid;
570         flist_add_tail(&ffi->list, list);
571 }
572
573 static void fio_server_add_conn_pid(struct flist_head *conn_list, pid_t pid)
574 {
575         dprint(FD_NET, "server: forked off connection job (pid=%u)\n", (int) pid);
576         fio_server_add_fork_item(pid, conn_list);
577 }
578
579 static void fio_server_add_job_pid(struct flist_head *job_list, pid_t pid)
580 {
581         dprint(FD_NET, "server: forked off job job (pid=%u)\n", (int) pid);
582         fio_server_add_fork_item(pid, job_list);
583 }
584
585 static void fio_server_check_fork_item(struct fio_fork_item *ffi)
586 {
587         int ret, status;
588
589         ret = waitpid(ffi->pid, &status, WNOHANG);
590         if (ret < 0) {
591                 if (errno == ECHILD) {
592                         log_err("fio: connection pid %u disappeared\n", (int) ffi->pid);
593                         ffi->exited = 1;
594                 } else
595                         log_err("fio: waitpid: %s\n", strerror(errno));
596         } else if (ret == ffi->pid) {
597                 if (WIFSIGNALED(status)) {
598                         ffi->signal = WTERMSIG(status);
599                         ffi->exited = 1;
600                 }
601                 if (WIFEXITED(status)) {
602                         if (WEXITSTATUS(status))
603                                 ffi->exitval = WEXITSTATUS(status);
604                         ffi->exited = 1;
605                 }
606         }
607 }
608
609 static void fio_server_fork_item_done(struct fio_fork_item *ffi, bool stop)
610 {
611         dprint(FD_NET, "pid %u exited, sig=%u, exitval=%d\n", (int) ffi->pid, ffi->signal, ffi->exitval);
612
613         /*
614          * Fold STOP and QUIT...
615          */
616         if (stop) {
617                 fio_net_queue_stop(ffi->exitval, ffi->signal);
618                 fio_net_queue_quit();
619         }
620
621         flist_del(&ffi->list);
622         free(ffi);
623 }
624
625 static void fio_server_check_fork_items(struct flist_head *list, bool stop)
626 {
627         struct flist_head *entry, *tmp;
628         struct fio_fork_item *ffi;
629
630         flist_for_each_safe(entry, tmp, list) {
631                 ffi = flist_entry(entry, struct fio_fork_item, list);
632
633                 fio_server_check_fork_item(ffi);
634
635                 if (ffi->exited)
636                         fio_server_fork_item_done(ffi, stop);
637         }
638 }
639
640 static void fio_server_check_jobs(struct flist_head *job_list)
641 {
642         fio_server_check_fork_items(job_list, true);
643 }
644
645 static void fio_server_check_conns(struct flist_head *conn_list)
646 {
647         fio_server_check_fork_items(conn_list, false);
648 }
649
650 static int handle_load_file_cmd(struct fio_net_cmd *cmd)
651 {
652         struct cmd_load_file_pdu *pdu = (struct cmd_load_file_pdu *) cmd->payload;
653         void *file_name = pdu->file;
654         struct cmd_start_pdu spdu;
655
656         dprint(FD_NET, "server: loading local file %s\n", (char *) file_name);
657
658         pdu->name_len = le16_to_cpu(pdu->name_len);
659         pdu->client_type = le16_to_cpu(pdu->client_type);
660
661         if (parse_jobs_ini(file_name, 0, 0, pdu->client_type)) {
662                 fio_net_queue_quit();
663                 return -1;
664         }
665
666         spdu.jobs = cpu_to_le32(thread_number);
667         spdu.stat_outputs = cpu_to_le32(stat_number);
668         fio_net_queue_cmd(FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, SK_F_COPY);
669         return 0;
670 }
671
672 static int handle_run_cmd(struct sk_out *sk_out, struct flist_head *job_list,
673                           struct fio_net_cmd *cmd)
674 {
675         struct backend_data data;
676         pid_t pid;
677         int ret;
678
679         fio_time_init();
680         set_genesis_time();
681
682         pid = fork();
683         if (pid) {
684                 fio_server_add_job_pid(job_list, pid);
685                 return 0;
686         }
687
688         data.key = sk_out_key;
689         data.ptr = sk_out;
690         //pthread_setspecific(sk_out_key, sk_out);
691         ret = fio_backend(&data);
692         free_threads_shm();
693         _exit(ret);
694 }
695
696 static int handle_job_cmd(struct fio_net_cmd *cmd)
697 {
698         struct cmd_job_pdu *pdu = (struct cmd_job_pdu *) cmd->payload;
699         void *buf = pdu->buf;
700         struct cmd_start_pdu spdu;
701
702         pdu->buf_len = le32_to_cpu(pdu->buf_len);
703         pdu->client_type = le32_to_cpu(pdu->client_type);
704
705         if (parse_jobs_ini(buf, 1, 0, pdu->client_type)) {
706                 fio_net_queue_quit();
707                 return -1;
708         }
709
710         spdu.jobs = cpu_to_le32(thread_number);
711         spdu.stat_outputs = cpu_to_le32(stat_number);
712
713         fio_net_queue_cmd(FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, SK_F_COPY);
714         return 0;
715 }
716
717 static int handle_jobline_cmd(struct fio_net_cmd *cmd)
718 {
719         void *pdu = cmd->payload;
720         struct cmd_single_line_pdu *cslp;
721         struct cmd_line_pdu *clp;
722         unsigned long offset;
723         struct cmd_start_pdu spdu;
724         char **argv;
725         int i;
726
727         clp = pdu;
728         clp->lines = le16_to_cpu(clp->lines);
729         clp->client_type = le16_to_cpu(clp->client_type);
730         argv = malloc(clp->lines * sizeof(char *));
731         offset = sizeof(*clp);
732
733         dprint(FD_NET, "server: %d command line args\n", clp->lines);
734
735         for (i = 0; i < clp->lines; i++) {
736                 cslp = pdu + offset;
737                 argv[i] = (char *) cslp->text;
738
739                 offset += sizeof(*cslp) + le16_to_cpu(cslp->len);
740                 dprint(FD_NET, "server: %d: %s\n", i, argv[i]);
741         }
742
743         if (parse_cmd_line(clp->lines, argv, clp->client_type)) {
744                 fio_net_queue_quit();
745                 free(argv);
746                 return -1;
747         }
748
749         free(argv);
750
751         spdu.jobs = cpu_to_le32(thread_number);
752         spdu.stat_outputs = cpu_to_le32(stat_number);
753
754         fio_net_queue_cmd(FIO_NET_CMD_START, &spdu, sizeof(spdu), NULL, SK_F_COPY);
755         return 0;
756 }
757
758 static int handle_probe_cmd(struct fio_net_cmd *cmd)
759 {
760         struct cmd_client_probe_pdu *pdu = (struct cmd_client_probe_pdu *) cmd->payload;
761         struct cmd_probe_reply_pdu probe;
762         uint64_t tag = cmd->tag;
763
764         dprint(FD_NET, "server: sending probe reply\n");
765
766         strcpy(me, (char *) pdu->server);
767
768         memset(&probe, 0, sizeof(probe));
769         gethostname((char *) probe.hostname, sizeof(probe.hostname));
770 #ifdef CONFIG_BIG_ENDIAN
771         probe.bigendian = 1;
772 #endif
773         strncpy((char *) probe.fio_version, fio_version_string, sizeof(probe.fio_version));
774
775         probe.os        = FIO_OS;
776         probe.arch      = FIO_ARCH;
777         probe.bpp       = sizeof(void *);
778         probe.cpus      = __cpu_to_le32(cpus_online());
779
780         /*
781          * If the client supports compression and we do too, then enable it
782          */
783         if (has_zlib && le64_to_cpu(pdu->flags) & FIO_PROBE_FLAG_ZLIB) {
784                 probe.flags = __cpu_to_le64(FIO_PROBE_FLAG_ZLIB);
785                 use_zlib = 1;
786         } else {
787                 probe.flags = 0;
788                 use_zlib = 0;
789         }
790
791         return fio_net_queue_cmd(FIO_NET_CMD_PROBE, &probe, sizeof(probe), &tag, SK_F_COPY);
792 }
793
794 static int handle_send_eta_cmd(struct fio_net_cmd *cmd)
795 {
796         struct jobs_eta *je;
797         uint64_t tag = cmd->tag;
798         size_t size;
799         int i;
800
801         dprint(FD_NET, "server sending status\n");
802
803         /*
804          * Fake ETA return if we don't have a local one, otherwise the client
805          * will end up timing out waiting for a response to the ETA request
806          */
807         je = get_jobs_eta(true, &size);
808         if (!je) {
809                 size = sizeof(*je);
810                 je = calloc(1, size);
811         } else {
812                 je->nr_running          = cpu_to_le32(je->nr_running);
813                 je->nr_ramp             = cpu_to_le32(je->nr_ramp);
814                 je->nr_pending          = cpu_to_le32(je->nr_pending);
815                 je->nr_setting_up       = cpu_to_le32(je->nr_setting_up);
816                 je->files_open          = cpu_to_le32(je->files_open);
817
818                 for (i = 0; i < DDIR_RWDIR_CNT; i++) {
819                         je->m_rate[i]   = cpu_to_le32(je->m_rate[i]);
820                         je->t_rate[i]   = cpu_to_le32(je->t_rate[i]);
821                         je->m_iops[i]   = cpu_to_le32(je->m_iops[i]);
822                         je->t_iops[i]   = cpu_to_le32(je->t_iops[i]);
823                         je->rate[i]     = cpu_to_le32(je->rate[i]);
824                         je->iops[i]     = cpu_to_le32(je->iops[i]);
825                 }
826
827                 je->elapsed_sec         = cpu_to_le64(je->elapsed_sec);
828                 je->eta_sec             = cpu_to_le64(je->eta_sec);
829                 je->nr_threads          = cpu_to_le32(je->nr_threads);
830                 je->is_pow2             = cpu_to_le32(je->is_pow2);
831                 je->unit_base           = cpu_to_le32(je->unit_base);
832         }
833
834         fio_net_queue_cmd(FIO_NET_CMD_ETA, je, size, &tag, SK_F_FREE);
835         return 0;
836 }
837
838 static int send_update_job_reply(uint64_t __tag, int error)
839 {
840         uint64_t tag = __tag;
841         uint32_t pdu_error;
842
843         pdu_error = __cpu_to_le32(error);
844         return fio_net_queue_cmd(FIO_NET_CMD_UPDATE_JOB, &pdu_error, sizeof(pdu_error), &tag, SK_F_COPY);
845 }
846
847 static int handle_update_job_cmd(struct fio_net_cmd *cmd)
848 {
849         struct cmd_add_job_pdu *pdu = (struct cmd_add_job_pdu *) cmd->payload;
850         struct thread_data *td;
851         uint32_t tnumber;
852
853         tnumber = le32_to_cpu(pdu->thread_number);
854
855         dprint(FD_NET, "server: updating options for job %u\n", tnumber);
856
857         if (!tnumber || tnumber > thread_number) {
858                 send_update_job_reply(cmd->tag, ENODEV);
859                 return 0;
860         }
861
862         td = &threads[tnumber - 1];
863         convert_thread_options_to_cpu(&td->o, &pdu->top);
864         send_update_job_reply(cmd->tag, 0);
865         return 0;
866 }
867
868 static int handle_trigger_cmd(struct fio_net_cmd *cmd)
869 {
870         struct cmd_vtrigger_pdu *pdu = (struct cmd_vtrigger_pdu *) cmd->payload;
871         char *buf = (char *) pdu->cmd;
872         struct all_io_list *rep;
873         size_t sz;
874
875         pdu->len = le16_to_cpu(pdu->len);
876         buf[pdu->len] = '\0';
877
878         rep = get_all_io_list(IO_LIST_ALL, &sz);
879         if (!rep) {
880                 struct all_io_list state;
881
882                 state.threads = cpu_to_le64((uint64_t) 0);
883                 fio_net_queue_cmd(FIO_NET_CMD_VTRIGGER, &state, sizeof(state), NULL, SK_F_COPY);
884         } else
885                 fio_net_queue_cmd(FIO_NET_CMD_VTRIGGER, rep, sz, NULL, SK_F_FREE);
886
887         exec_trigger(buf);
888         return 0;
889 }
890
891 static int handle_command(struct sk_out *sk_out, struct flist_head *job_list,
892                           struct fio_net_cmd *cmd)
893 {
894         int ret;
895
896         dprint(FD_NET, "server: got op [%s], pdu=%u, tag=%llx\n",
897                         fio_server_op(cmd->opcode), cmd->pdu_len,
898                         (unsigned long long) cmd->tag);
899
900         switch (cmd->opcode) {
901         case FIO_NET_CMD_QUIT:
902                 fio_terminate_threads(TERMINATE_ALL);
903                 ret = 0;
904                 break;
905         case FIO_NET_CMD_EXIT:
906                 exit_backend = 1;
907                 return -1;
908         case FIO_NET_CMD_LOAD_FILE:
909                 ret = handle_load_file_cmd(cmd);
910                 break;
911         case FIO_NET_CMD_JOB:
912                 ret = handle_job_cmd(cmd);
913                 break;
914         case FIO_NET_CMD_JOBLINE:
915                 ret = handle_jobline_cmd(cmd);
916                 break;
917         case FIO_NET_CMD_PROBE:
918                 ret = handle_probe_cmd(cmd);
919                 break;
920         case FIO_NET_CMD_SEND_ETA:
921                 ret = handle_send_eta_cmd(cmd);
922                 break;
923         case FIO_NET_CMD_RUN:
924                 ret = handle_run_cmd(sk_out, job_list, cmd);
925                 break;
926         case FIO_NET_CMD_UPDATE_JOB:
927                 ret = handle_update_job_cmd(cmd);
928                 break;
929         case FIO_NET_CMD_VTRIGGER:
930                 ret = handle_trigger_cmd(cmd);
931                 break;
932         case FIO_NET_CMD_SENDFILE: {
933                 struct cmd_sendfile_reply *in;
934                 struct cmd_reply *rep;
935
936                 rep = (struct cmd_reply *) (uintptr_t) cmd->tag;
937
938                 in = (struct cmd_sendfile_reply *) cmd->payload;
939                 in->size = le32_to_cpu(in->size);
940                 in->error = le32_to_cpu(in->error);
941                 if (in->error) {
942                         ret = 1;
943                         rep->error = in->error;
944                 } else {
945                         ret = 0;
946                         rep->data = smalloc(in->size);
947                         if (!rep->data) {
948                                 ret = 1;
949                                 rep->error = ENOMEM;
950                         } else {
951                                 rep->size = in->size;
952                                 memcpy(rep->data, in->data, in->size);
953                         }
954                 }
955                 fio_mutex_up(&rep->lock);
956                 break;
957                 }
958         default:
959                 log_err("fio: unknown opcode: %s\n", fio_server_op(cmd->opcode));
960                 ret = 1;
961         }
962
963         return ret;
964 }
965
966 /*
967  * Send a command with a separate PDU, not inlined in the command
968  */
969 static int fio_send_cmd_ext_pdu(int sk, uint16_t opcode, const void *buf,
970                                 off_t size, uint64_t tag, uint32_t flags)
971 {
972         struct fio_net_cmd cmd;
973         struct iovec iov[2];
974
975         iov[0].iov_base = (void *) &cmd;
976         iov[0].iov_len = sizeof(cmd);
977         iov[1].iov_base = (void *) buf;
978         iov[1].iov_len = size;
979
980         __fio_init_net_cmd(&cmd, opcode, size, tag);
981         cmd.flags = __cpu_to_le32(flags);
982         fio_net_cmd_crc_pdu(&cmd, buf);
983
984         return fio_sendv_data(sk, iov, 2);
985 }
986
987 static void finish_entry(struct sk_entry *entry)
988 {
989         if (entry->flags & SK_F_FREE)
990                 free(entry->buf);
991         else if (entry->flags & SK_F_COPY)
992                 sfree(entry->buf);
993
994         sfree(entry);
995 }
996
997 static void entry_set_flags_tag(struct sk_entry *entry, struct flist_head *list,
998                                 unsigned int *flags, uint64_t *tag)
999 {
1000         if (!flist_empty(list))
1001                 *flags = FIO_NET_CMD_F_MORE;
1002         else
1003                 *flags = 0;
1004
1005         if (entry->tagptr)
1006                 *tag = *entry->tagptr;
1007         else
1008                 *tag = 0;
1009 }
1010
1011 static int send_vec_entry(struct sk_out *sk_out, struct sk_entry *first)
1012 {
1013         unsigned int flags;
1014         uint64_t tag;
1015         int ret;
1016
1017         entry_set_flags_tag(first, &first->next, &flags, &tag);
1018
1019         ret = fio_send_cmd_ext_pdu(sk_out->sk, first->opcode, first->buf, first->size, tag, flags);
1020
1021         while (!flist_empty(&first->next)) {
1022                 struct sk_entry *next;
1023
1024                 next = flist_first_entry(&first->next, struct sk_entry, list);
1025                 flist_del_init(&next->list);
1026
1027                 entry_set_flags_tag(next, &first->next, &flags, &tag);
1028
1029                 ret += fio_send_cmd_ext_pdu(sk_out->sk, next->opcode, next->buf, next->size, tag, flags);
1030                 finish_entry(next);
1031         }
1032
1033         return ret;
1034 }
1035
1036 static int handle_sk_entry(struct sk_out *sk_out, struct sk_entry *entry)
1037 {
1038         int ret;
1039
1040         if (entry->flags & SK_F_VEC)
1041                 ret = send_vec_entry(sk_out, entry);
1042         if (entry->flags & SK_F_SIMPLE) {
1043                 uint64_t tag = 0;
1044
1045                 if (entry->tagptr)
1046                         tag = *entry->tagptr;
1047
1048                 ret = fio_net_send_simple_cmd(sk_out->sk, entry->opcode, tag, NULL);
1049         } else
1050                 ret = fio_net_send_cmd(sk_out->sk, entry->opcode, entry->buf, entry->size, entry->tagptr, NULL);
1051
1052         if (ret)
1053                 log_err("fio: failed handling cmd %s\n", fio_server_op(entry->opcode));
1054
1055         finish_entry(entry);
1056         return ret;
1057 }
1058
1059 static int handle_xmits(struct sk_out *sk_out)
1060 {
1061         struct sk_entry *entry;
1062         FLIST_HEAD(list);
1063         int ret = 0;
1064
1065         sk_lock(sk_out);
1066         if (flist_empty(&sk_out->list)) {
1067                 sk_unlock(sk_out);
1068                 return 0;
1069         }
1070
1071         flist_splice_init(&sk_out->list, &list);
1072         sk_unlock(sk_out);
1073
1074         while (!flist_empty(&list)) {
1075                 entry = flist_entry(list.next, struct sk_entry, list);
1076                 flist_del(&entry->list);
1077                 ret += handle_sk_entry(sk_out, entry);
1078         }
1079
1080         return ret;
1081 }
1082
1083 static int handle_connection(struct sk_out *sk_out)
1084 {
1085         struct fio_net_cmd *cmd = NULL;
1086         FLIST_HEAD(job_list);
1087         int ret = 0;
1088
1089         reset_fio_state();
1090
1091         /* read forever */
1092         while (!exit_backend) {
1093                 struct pollfd pfd = {
1094                         .fd     = sk_out->sk,
1095                         .events = POLLIN,
1096                 };
1097
1098                 ret = 0;
1099                 do {
1100                         int timeout = 1000;
1101
1102                         if (!flist_empty(&job_list))
1103                                 timeout = 100;
1104
1105                         handle_xmits(sk_out);
1106
1107                         ret = poll(&pfd, 1, 0);
1108                         if (ret < 0) {
1109                                 if (errno == EINTR)
1110                                         break;
1111                                 log_err("fio: poll: %s\n", strerror(errno));
1112                                 break;
1113                         } else if (!ret) {
1114                                 fio_server_check_jobs(&job_list);
1115                                 fio_mutex_down_timeout(sk_out->wait, timeout);
1116                                 continue;
1117                         }
1118
1119                         if (pfd.revents & POLLIN)
1120                                 break;
1121                         if (pfd.revents & (POLLERR|POLLHUP)) {
1122                                 ret = 1;
1123                                 break;
1124                         }
1125                 } while (!exit_backend);
1126
1127                 fio_server_check_jobs(&job_list);
1128
1129                 if (ret < 0)
1130                         break;
1131
1132                 cmd = fio_net_recv_cmd(sk_out->sk);
1133                 if (!cmd) {
1134                         ret = -1;
1135                         break;
1136                 }
1137
1138                 ret = handle_command(sk_out, &job_list, cmd);
1139                 if (ret)
1140                         break;
1141
1142                 free(cmd);
1143                 cmd = NULL;
1144         }
1145
1146         if (cmd)
1147                 free(cmd);
1148
1149         handle_xmits(sk_out);
1150
1151         close(sk_out->sk);
1152         _exit(ret);
1153 }
1154
1155 /* get the address on this host bound by the input socket, 
1156  * whether it is ipv6 or ipv4 */
1157
1158 int get_my_addr_str(int sk)
1159 {
1160         struct sockaddr_in6 myaddr6 = { 0, };
1161         struct sockaddr_in myaddr4 = { 0, };
1162         struct sockaddr *sockaddr_p;
1163         char *net_addr;
1164         socklen_t len;
1165         int ret;
1166
1167         if (use_ipv6) {
1168                 len = sizeof(myaddr6);
1169                 sockaddr_p = (struct sockaddr * )&myaddr6;
1170                 net_addr = (char * )&myaddr6.sin6_addr;
1171         } else {
1172                 len = sizeof(myaddr4);
1173                 sockaddr_p = (struct sockaddr * )&myaddr4;
1174                 net_addr = (char * )&myaddr4.sin_addr;
1175         }
1176
1177         ret = getsockname(sk, sockaddr_p, &len);
1178         if (ret) {
1179                 log_err("fio: getsockaddr: %s\n", strerror(errno));
1180                 return -1;
1181         }
1182
1183         if (!inet_ntop(use_ipv6?AF_INET6:AF_INET, net_addr, client_sockaddr_str, INET6_ADDRSTRLEN - 1)) {
1184                 log_err("inet_ntop: failed to convert addr to string\n");
1185                 return -1;
1186         }
1187
1188         dprint(FD_NET, "fio server bound to addr %s\n", client_sockaddr_str);
1189         return 0;
1190 }
1191
1192 static int accept_loop(int listen_sk)
1193 {
1194         struct sockaddr_in addr;
1195         struct sockaddr_in6 addr6;
1196         socklen_t len = use_ipv6 ? sizeof(addr6) : sizeof(addr);
1197         struct pollfd pfd;
1198         int ret = 0, sk, exitval = 0;
1199         struct sk_out *sk_out;
1200         FLIST_HEAD(conn_list);
1201
1202         dprint(FD_NET, "server enter accept loop\n");
1203
1204         fio_set_fd_nonblocking(listen_sk, "server");
1205
1206         sk_out = smalloc(sizeof(*sk_out));
1207         INIT_FLIST_HEAD(&sk_out->list);
1208         sk_out->lock = fio_mutex_init(FIO_MUTEX_UNLOCKED);
1209         sk_out->wait = fio_mutex_init(FIO_MUTEX_LOCKED);
1210
1211         pthread_setspecific(sk_out_key, sk_out);
1212
1213         while (!exit_backend) {
1214                 const char *from;
1215                 char buf[64];
1216                 pid_t pid;
1217
1218                 pfd.fd = listen_sk;
1219                 pfd.events = POLLIN;
1220                 do {
1221                         int timeout = 1000;
1222
1223                         if (!flist_empty(&conn_list))
1224                                 timeout = 100;
1225
1226                         ret = poll(&pfd, 1, timeout);
1227                         if (ret < 0) {
1228                                 if (errno == EINTR)
1229                                         break;
1230                                 log_err("fio: poll: %s\n", strerror(errno));
1231                                 break;
1232                         } else if (!ret) {
1233                                 fio_server_check_conns(&conn_list);
1234                                 continue;
1235                         }
1236
1237                         if (pfd.revents & POLLIN)
1238                                 break;
1239                 } while (!exit_backend);
1240
1241                 fio_server_check_conns(&conn_list);
1242
1243                 if (exit_backend || ret < 0)
1244                         break;
1245
1246                 if (use_ipv6)
1247                         sk = accept(listen_sk, (struct sockaddr *) &addr6, &len);
1248                 else
1249                         sk = accept(listen_sk, (struct sockaddr *) &addr, &len);
1250
1251                 if (sk < 0) {
1252                         log_err("fio: accept: %s\n", strerror(errno));
1253                         return -1;
1254                 }
1255
1256                 if (use_ipv6)
1257                         from = inet_ntop(AF_INET6, (struct sockaddr *) &addr6.sin6_addr, buf, sizeof(buf));
1258                 else
1259                         from = inet_ntop(AF_INET, (struct sockaddr *) &addr.sin_addr, buf, sizeof(buf));
1260
1261                 dprint(FD_NET, "server: connect from %s\n", from);
1262
1263                 sk_out->sk = sk;
1264
1265                 pid = fork();
1266                 if (pid) {
1267                         close(sk);
1268                         fio_server_add_conn_pid(&conn_list, pid);
1269                         pthread_setspecific(sk_out_key, sk_out);
1270                         continue;
1271                 }
1272
1273                 /* exits */
1274                 get_my_addr_str(sk); /* if error, it's already logged, non-fatal */
1275                 handle_connection(sk_out);
1276         }
1277
1278 #if 0
1279         fio_mutex_remove(sk_out->lock);
1280         fio_mutex_remove(sk_out->wait);
1281         sfree(sk_out);
1282         pthread_setspecific(sk_out_key, NULL);
1283 #endif
1284
1285         return exitval;
1286 }
1287
1288 int fio_server_text_output(int level, const char *buf, size_t len)
1289 {
1290         struct sk_out *sk_out = pthread_getspecific(sk_out_key);
1291         struct cmd_text_pdu *pdu;
1292         unsigned int tlen;
1293         struct timeval tv;
1294
1295         if (!sk_out || sk_out->sk == -1)
1296                 return -1;
1297
1298         tlen = sizeof(*pdu) + len;
1299         pdu = malloc(tlen);
1300
1301         pdu->level      = __cpu_to_le32(level);
1302         pdu->buf_len    = __cpu_to_le32(len);
1303
1304         gettimeofday(&tv, NULL);
1305         pdu->log_sec    = __cpu_to_le64(tv.tv_sec);
1306         pdu->log_usec   = __cpu_to_le64(tv.tv_usec);
1307
1308         memcpy(pdu->buf, buf, len);
1309
1310         fio_net_queue_cmd(FIO_NET_CMD_TEXT, pdu, tlen, NULL, SK_F_COPY);
1311         free(pdu);
1312         return len;
1313 }
1314
1315 static void convert_io_stat(struct io_stat *dst, struct io_stat *src)
1316 {
1317         dst->max_val    = cpu_to_le64(src->max_val);
1318         dst->min_val    = cpu_to_le64(src->min_val);
1319         dst->samples    = cpu_to_le64(src->samples);
1320
1321         /*
1322          * Encode to IEEE 754 for network transfer
1323          */
1324         dst->mean.u.i   = cpu_to_le64(fio_double_to_uint64(src->mean.u.f));
1325         dst->S.u.i      = cpu_to_le64(fio_double_to_uint64(src->S.u.f));
1326 }
1327
1328 static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src)
1329 {
1330         int i;
1331
1332         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
1333                 dst->max_run[i]         = cpu_to_le64(src->max_run[i]);
1334                 dst->min_run[i]         = cpu_to_le64(src->min_run[i]);
1335                 dst->max_bw[i]          = cpu_to_le64(src->max_bw[i]);
1336                 dst->min_bw[i]          = cpu_to_le64(src->min_bw[i]);
1337                 dst->io_kb[i]           = cpu_to_le64(src->io_kb[i]);
1338                 dst->agg[i]             = cpu_to_le64(src->agg[i]);
1339         }
1340
1341         dst->kb_base    = cpu_to_le32(src->kb_base);
1342         dst->unit_base  = cpu_to_le32(src->unit_base);
1343         dst->groupid    = cpu_to_le32(src->groupid);
1344         dst->unified_rw_rep     = cpu_to_le32(src->unified_rw_rep);
1345 }
1346
1347 /*
1348  * Send a CMD_TS, which packs struct thread_stat and group_run_stats
1349  * into a single payload.
1350  */
1351 void fio_server_send_ts(struct thread_stat *ts, struct group_run_stats *rs)
1352 {
1353         struct cmd_ts_pdu p;
1354         int i, j;
1355
1356         dprint(FD_NET, "server sending end stats\n");
1357
1358         memset(&p, 0, sizeof(p));
1359
1360         strncpy(p.ts.name, ts->name, FIO_JOBNAME_SIZE - 1);
1361         strncpy(p.ts.verror, ts->verror, FIO_VERROR_SIZE - 1);
1362         strncpy(p.ts.description, ts->description, FIO_JOBDESC_SIZE - 1);
1363
1364         p.ts.error              = cpu_to_le32(ts->error);
1365         p.ts.thread_number      = cpu_to_le32(ts->thread_number);
1366         p.ts.groupid            = cpu_to_le32(ts->groupid);
1367         p.ts.pid                = cpu_to_le32(ts->pid);
1368         p.ts.members            = cpu_to_le32(ts->members);
1369         p.ts.unified_rw_rep     = cpu_to_le32(ts->unified_rw_rep);
1370
1371         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
1372                 convert_io_stat(&p.ts.clat_stat[i], &ts->clat_stat[i]);
1373                 convert_io_stat(&p.ts.slat_stat[i], &ts->slat_stat[i]);
1374                 convert_io_stat(&p.ts.lat_stat[i], &ts->lat_stat[i]);
1375                 convert_io_stat(&p.ts.bw_stat[i], &ts->bw_stat[i]);
1376         }
1377
1378         p.ts.usr_time           = cpu_to_le64(ts->usr_time);
1379         p.ts.sys_time           = cpu_to_le64(ts->sys_time);
1380         p.ts.ctx                = cpu_to_le64(ts->ctx);
1381         p.ts.minf               = cpu_to_le64(ts->minf);
1382         p.ts.majf               = cpu_to_le64(ts->majf);
1383         p.ts.clat_percentiles   = cpu_to_le64(ts->clat_percentiles);
1384         p.ts.percentile_precision = cpu_to_le64(ts->percentile_precision);
1385
1386         for (i = 0; i < FIO_IO_U_LIST_MAX_LEN; i++) {
1387                 fio_fp64_t *src = &ts->percentile_list[i];
1388                 fio_fp64_t *dst = &p.ts.percentile_list[i];
1389
1390                 dst->u.i = cpu_to_le64(fio_double_to_uint64(src->u.f));
1391         }
1392
1393         for (i = 0; i < FIO_IO_U_MAP_NR; i++) {
1394                 p.ts.io_u_map[i]        = cpu_to_le32(ts->io_u_map[i]);
1395                 p.ts.io_u_submit[i]     = cpu_to_le32(ts->io_u_submit[i]);
1396                 p.ts.io_u_complete[i]   = cpu_to_le32(ts->io_u_complete[i]);
1397         }
1398
1399         for (i = 0; i < FIO_IO_U_LAT_U_NR; i++) {
1400                 p.ts.io_u_lat_u[i]      = cpu_to_le32(ts->io_u_lat_u[i]);
1401                 p.ts.io_u_lat_m[i]      = cpu_to_le32(ts->io_u_lat_m[i]);
1402         }
1403
1404         for (i = 0; i < DDIR_RWDIR_CNT; i++)
1405                 for (j = 0; j < FIO_IO_U_PLAT_NR; j++)
1406                         p.ts.io_u_plat[i][j] = cpu_to_le32(ts->io_u_plat[i][j]);
1407
1408         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
1409                 p.ts.total_io_u[i]      = cpu_to_le64(ts->total_io_u[i]);
1410                 p.ts.short_io_u[i]      = cpu_to_le64(ts->short_io_u[i]);
1411                 p.ts.drop_io_u[i]       = cpu_to_le64(ts->drop_io_u[i]);
1412         }
1413
1414         p.ts.total_submit       = cpu_to_le64(ts->total_submit);
1415         p.ts.total_complete     = cpu_to_le64(ts->total_complete);
1416
1417         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
1418                 p.ts.io_bytes[i]        = cpu_to_le64(ts->io_bytes[i]);
1419                 p.ts.runtime[i]         = cpu_to_le64(ts->runtime[i]);
1420         }
1421
1422         p.ts.total_run_time     = cpu_to_le64(ts->total_run_time);
1423         p.ts.continue_on_error  = cpu_to_le16(ts->continue_on_error);
1424         p.ts.total_err_count    = cpu_to_le64(ts->total_err_count);
1425         p.ts.first_error        = cpu_to_le32(ts->first_error);
1426         p.ts.kb_base            = cpu_to_le32(ts->kb_base);
1427         p.ts.unit_base          = cpu_to_le32(ts->unit_base);
1428
1429         p.ts.latency_depth      = cpu_to_le32(ts->latency_depth);
1430         p.ts.latency_target     = cpu_to_le64(ts->latency_target);
1431         p.ts.latency_window     = cpu_to_le64(ts->latency_window);
1432         p.ts.latency_percentile.u.i = cpu_to_le64(fio_double_to_uint64(ts->latency_percentile.u.f));
1433
1434         p.ts.nr_block_infos     = le64_to_cpu(ts->nr_block_infos);
1435         for (i = 0; i < p.ts.nr_block_infos; i++)
1436                 p.ts.block_infos[i] = le32_to_cpu(ts->block_infos[i]);
1437
1438         convert_gs(&p.rs, rs);
1439
1440         fio_net_queue_cmd(FIO_NET_CMD_TS, &p, sizeof(p), NULL, SK_F_COPY);
1441 }
1442
1443 void fio_server_send_gs(struct group_run_stats *rs)
1444 {
1445         struct group_run_stats gs;
1446
1447         dprint(FD_NET, "server sending group run stats\n");
1448
1449         convert_gs(&gs, rs);
1450         fio_net_queue_cmd(FIO_NET_CMD_GS, &gs, sizeof(gs), NULL, SK_F_COPY);
1451 }
1452
1453 static void convert_agg(struct disk_util_agg *dst, struct disk_util_agg *src)
1454 {
1455         int i;
1456
1457         for (i = 0; i < 2; i++) {
1458                 dst->ios[i]     = cpu_to_le64(src->ios[i]);
1459                 dst->merges[i]  = cpu_to_le64(src->merges[i]);
1460                 dst->sectors[i] = cpu_to_le64(src->sectors[i]);
1461                 dst->ticks[i]   = cpu_to_le64(src->ticks[i]);
1462         }
1463
1464         dst->io_ticks           = cpu_to_le64(src->io_ticks);
1465         dst->time_in_queue      = cpu_to_le64(src->time_in_queue);
1466         dst->slavecount         = cpu_to_le32(src->slavecount);
1467         dst->max_util.u.i       = cpu_to_le64(fio_double_to_uint64(src->max_util.u.f));
1468 }
1469
1470 static void convert_dus(struct disk_util_stat *dst, struct disk_util_stat *src)
1471 {
1472         int i;
1473
1474         dst->name[FIO_DU_NAME_SZ - 1] = '\0';
1475         strncpy((char *) dst->name, (char *) src->name, FIO_DU_NAME_SZ - 1);
1476
1477         for (i = 0; i < 2; i++) {
1478                 dst->s.ios[i]           = cpu_to_le64(src->s.ios[i]);
1479                 dst->s.merges[i]        = cpu_to_le64(src->s.merges[i]);
1480                 dst->s.sectors[i]       = cpu_to_le64(src->s.sectors[i]);
1481                 dst->s.ticks[i]         = cpu_to_le64(src->s.ticks[i]);
1482         }
1483
1484         dst->s.io_ticks         = cpu_to_le64(src->s.io_ticks);
1485         dst->s.time_in_queue    = cpu_to_le64(src->s.time_in_queue);
1486         dst->s.msec             = cpu_to_le64(src->s.msec);
1487 }
1488
1489 void fio_server_send_du(void)
1490 {
1491         struct disk_util *du;
1492         struct flist_head *entry;
1493         struct cmd_du_pdu pdu;
1494
1495         dprint(FD_NET, "server: sending disk_util %d\n", !flist_empty(&disk_list));
1496
1497         memset(&pdu, 0, sizeof(pdu));
1498
1499         flist_for_each(entry, &disk_list) {
1500                 du = flist_entry(entry, struct disk_util, list);
1501
1502                 convert_dus(&pdu.dus, &du->dus);
1503                 convert_agg(&pdu.agg, &du->agg);
1504
1505                 fio_net_queue_cmd(FIO_NET_CMD_DU, &pdu, sizeof(pdu), NULL, SK_F_COPY);
1506         }
1507 }
1508
1509 static int fio_send_iolog_gz(struct sk_entry *first, struct io_log *log)
1510 {
1511         int ret = 0;
1512 #ifdef CONFIG_ZLIB
1513         struct sk_entry *entry;
1514         z_stream stream;
1515         void *out_pdu;
1516
1517         /*
1518          * Dirty - since the log is potentially huge, compress it into
1519          * FIO_SERVER_MAX_FRAGMENT_PDU chunks and let the receiving
1520          * side defragment it.
1521          */
1522         out_pdu = malloc(FIO_SERVER_MAX_FRAGMENT_PDU);
1523
1524         stream.zalloc = Z_NULL;
1525         stream.zfree = Z_NULL;
1526         stream.opaque = Z_NULL;
1527
1528         if (deflateInit(&stream, Z_DEFAULT_COMPRESSION) != Z_OK) {
1529                 ret = 1;
1530                 goto err;
1531         }
1532
1533         stream.next_in = (void *) log->log;
1534         stream.avail_in = log->nr_samples * log_entry_sz(log);
1535
1536         do {
1537                 unsigned int this_len;
1538
1539                 stream.avail_out = FIO_SERVER_MAX_FRAGMENT_PDU;
1540                 stream.next_out = out_pdu;
1541                 ret = deflate(&stream, Z_FINISH);
1542                 /* may be Z_OK, or Z_STREAM_END */
1543                 if (ret < 0)
1544                         goto err_zlib;
1545
1546                 this_len = FIO_SERVER_MAX_FRAGMENT_PDU - stream.avail_out;
1547
1548                 entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, out_pdu, this_len,
1549                                                 NULL, SK_F_FREE | SK_F_VEC);
1550                 flist_add_tail(&entry->list, &first->next);
1551         } while (stream.avail_in);
1552
1553 err_zlib:
1554         deflateEnd(&stream);
1555 err:
1556         free(out_pdu);
1557 #endif
1558         return ret;
1559 }
1560
1561 int fio_send_iolog(struct thread_data *td, struct io_log *log, const char *name)
1562 {
1563         struct cmd_iolog_pdu pdu;
1564         struct sk_entry *first;
1565         int i, ret = 0;
1566
1567         pdu.nr_samples = cpu_to_le64(log->nr_samples);
1568         pdu.thread_number = cpu_to_le32(td->thread_number);
1569         pdu.log_type = cpu_to_le32(log->log_type);
1570         pdu.compressed = cpu_to_le32(use_zlib);
1571
1572         strncpy((char *) pdu.name, name, FIO_NET_NAME_MAX);
1573         pdu.name[FIO_NET_NAME_MAX - 1] = '\0';
1574
1575         for (i = 0; i < log->nr_samples; i++) {
1576                 struct io_sample *s = get_sample(log, i);
1577
1578                 s->time         = cpu_to_le64(s->time);
1579                 s->val          = cpu_to_le64(s->val);
1580                 s->__ddir       = cpu_to_le32(s->__ddir);
1581                 s->bs           = cpu_to_le32(s->bs);
1582
1583                 if (log->log_offset) {
1584                         struct io_sample_offset *so = (void *) s;
1585
1586                         so->offset = cpu_to_le64(so->offset);
1587                 }
1588         }
1589
1590         /*
1591          * Assemble header entry first
1592          */
1593         first = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, &pdu, sizeof(pdu), NULL, SK_F_COPY | SK_F_VEC);
1594
1595         /*
1596          * Now append actual log entries. Compress if we can, otherwise just
1597          * plain text output.
1598          */
1599         if (use_zlib)
1600                 ret = fio_send_iolog_gz(first, log);
1601         else {
1602                 struct sk_entry *entry;
1603
1604                 entry = fio_net_prep_cmd(FIO_NET_CMD_IOLOG, log->log,
1605                                         log->nr_samples * log_entry_sz(log),
1606                                         NULL, SK_F_FREE | SK_F_VEC);
1607                 flist_add_tail(&entry->list, &first->next);
1608         }
1609
1610         return ret;
1611 }
1612
1613 void fio_server_send_add_job(struct thread_data *td)
1614 {
1615         struct cmd_add_job_pdu pdu;
1616
1617         memset(&pdu, 0, sizeof(pdu));
1618         pdu.thread_number = cpu_to_le32(td->thread_number);
1619         pdu.groupid = cpu_to_le32(td->groupid);
1620         convert_thread_options_to_net(&pdu.top, &td->o);
1621
1622         fio_net_queue_cmd(FIO_NET_CMD_ADD_JOB, &pdu, sizeof(pdu), NULL, SK_F_COPY);
1623 }
1624
1625 void fio_server_send_start(struct thread_data *td)
1626 {
1627         struct sk_out *sk_out = pthread_getspecific(sk_out_key);
1628
1629         assert(sk_out->sk != -1);
1630
1631         fio_net_queue_cmd(FIO_NET_CMD_SERVER_START, NULL, 0, 0, SK_F_SIMPLE);
1632 }
1633
1634 int fio_server_get_verify_state(const char *name, int threadnumber,
1635                                 void **datap, int *version)
1636 {
1637         struct thread_io_list *s;
1638         struct cmd_sendfile out;
1639         struct cmd_reply *rep;
1640         uint64_t tag;
1641         void *data;
1642
1643         dprint(FD_NET, "server: request verify state\n");
1644
1645         rep = smalloc(sizeof(*rep));
1646         if (!rep) {
1647                 log_err("fio: smalloc pool too small\n");
1648                 return 1;
1649         }
1650
1651         __fio_mutex_init(&rep->lock, FIO_MUTEX_LOCKED);
1652         rep->data = NULL;
1653         rep->error = 0;
1654
1655         verify_state_gen_name((char *) out.path, sizeof(out.path), name, me,
1656                                 threadnumber);
1657         tag = (uint64_t) (uintptr_t) rep;
1658         fio_net_queue_cmd(FIO_NET_CMD_SENDFILE, &out, sizeof(out), &tag, SK_F_COPY);
1659
1660         /*
1661          * Wait for the backend to receive the reply
1662          */
1663         if (fio_mutex_down_timeout(&rep->lock, 10000)) {
1664                 log_err("fio: timed out waiting for reply\n");
1665                 goto fail;
1666         }
1667
1668         if (rep->error) {
1669                 log_err("fio: failure on receiving state file: %s\n", strerror(rep->error));
1670 fail:
1671                 *datap = NULL;
1672                 sfree(rep);
1673                 fio_net_queue_quit();
1674                 return 1;
1675         }
1676
1677         /*
1678          * The format is verify_state_hdr, then thread_io_list. Verify
1679          * the header, and the thread_io_list checksum
1680          */
1681         s = rep->data + sizeof(struct verify_state_hdr);
1682         if (verify_state_hdr(rep->data, s, version))
1683                 goto fail;
1684
1685         /*
1686          * Don't need the header from now, copy just the thread_io_list
1687          */
1688         rep->size -= sizeof(struct verify_state_hdr);
1689         data = malloc(rep->size);
1690         memcpy(data, s, rep->size);
1691         *datap = data;
1692
1693         sfree(rep->data);
1694         __fio_mutex_remove(&rep->lock);
1695         sfree(rep);
1696         return 0;
1697 }
1698
1699 static int fio_init_server_ip(void)
1700 {
1701         struct sockaddr *addr;
1702         socklen_t socklen;
1703         char buf[80];
1704         const char *str;
1705         int sk, opt;
1706
1707         if (use_ipv6)
1708                 sk = socket(AF_INET6, SOCK_STREAM, 0);
1709         else
1710                 sk = socket(AF_INET, SOCK_STREAM, 0);
1711
1712         if (sk < 0) {
1713                 log_err("fio: socket: %s\n", strerror(errno));
1714                 return -1;
1715         }
1716
1717         opt = 1;
1718         if (setsockopt(sk, SOL_SOCKET, SO_REUSEADDR, (void *)&opt, sizeof(opt)) < 0) {
1719                 log_err("fio: setsockopt(REUSEADDR): %s\n", strerror(errno));
1720                 close(sk);
1721                 return -1;
1722         }
1723 #ifdef SO_REUSEPORT
1724         if (setsockopt(sk, SOL_SOCKET, SO_REUSEPORT, &opt, sizeof(opt)) < 0) {
1725                 log_err("fio: setsockopt(REUSEPORT): %s\n", strerror(errno));
1726                 close(sk);
1727                 return -1;
1728         }
1729 #endif
1730
1731         if (use_ipv6) {
1732                 const void *src = &saddr_in6.sin6_addr;
1733
1734                 addr = (struct sockaddr *) &saddr_in6;
1735                 socklen = sizeof(saddr_in6);
1736                 saddr_in6.sin6_family = AF_INET6;
1737                 str = inet_ntop(AF_INET6, src, buf, sizeof(buf));
1738         } else {
1739                 const void *src = &saddr_in.sin_addr;
1740
1741                 addr = (struct sockaddr *) &saddr_in;
1742                 socklen = sizeof(saddr_in);
1743                 saddr_in.sin_family = AF_INET;
1744                 str = inet_ntop(AF_INET, src, buf, sizeof(buf));
1745         }
1746
1747         if (bind(sk, addr, socklen) < 0) {
1748                 log_err("fio: bind: %s\n", strerror(errno));
1749                 log_info("fio: failed with IPv%c %s\n", use_ipv6 ? '6' : '4', str);
1750                 close(sk);
1751                 return -1;
1752         }
1753
1754         return sk;
1755 }
1756
1757 static int fio_init_server_sock(void)
1758 {
1759         struct sockaddr_un addr;
1760         socklen_t len;
1761         mode_t mode;
1762         int sk;
1763
1764         sk = socket(AF_UNIX, SOCK_STREAM, 0);
1765         if (sk < 0) {
1766                 log_err("fio: socket: %s\n", strerror(errno));
1767                 return -1;
1768         }
1769
1770         mode = umask(000);
1771
1772         memset(&addr, 0, sizeof(addr));
1773         addr.sun_family = AF_UNIX;
1774         strncpy(addr.sun_path, bind_sock, sizeof(addr.sun_path) - 1);
1775
1776         len = sizeof(addr.sun_family) + strlen(bind_sock) + 1;
1777
1778         if (bind(sk, (struct sockaddr *) &addr, len) < 0) {
1779                 log_err("fio: bind: %s\n", strerror(errno));
1780                 close(sk);
1781                 return -1;
1782         }
1783
1784         umask(mode);
1785         return sk;
1786 }
1787
1788 static int fio_init_server_connection(void)
1789 {
1790         char bind_str[128];
1791         int sk;
1792
1793         dprint(FD_NET, "starting server\n");
1794
1795         if (!bind_sock)
1796                 sk = fio_init_server_ip();
1797         else
1798                 sk = fio_init_server_sock();
1799
1800         if (sk < 0)
1801                 return sk;
1802
1803         memset(bind_str, 0, sizeof(bind_str));
1804
1805         if (!bind_sock) {
1806                 char *p, port[16];
1807                 const void *src;
1808                 int af;
1809
1810                 if (use_ipv6) {
1811                         af = AF_INET6;
1812                         src = &saddr_in6.sin6_addr;
1813                 } else {
1814                         af = AF_INET;
1815                         src = &saddr_in.sin_addr;
1816                 }
1817
1818                 p = (char *) inet_ntop(af, src, bind_str, sizeof(bind_str));
1819
1820                 sprintf(port, ",%u", fio_net_port);
1821                 if (p)
1822                         strcat(p, port);
1823                 else
1824                         strncpy(bind_str, port, sizeof(bind_str) - 1);
1825         } else
1826                 strncpy(bind_str, bind_sock, sizeof(bind_str) - 1);
1827
1828         log_info("fio: server listening on %s\n", bind_str);
1829
1830         if (listen(sk, 0) < 0) {
1831                 log_err("fio: listen: %s\n", strerror(errno));
1832                 close(sk);
1833                 return -1;
1834         }
1835
1836         return sk;
1837 }
1838
1839 int fio_server_parse_host(const char *host, int ipv6, struct in_addr *inp,
1840                           struct in6_addr *inp6)
1841
1842 {
1843         int ret = 0;
1844
1845         if (ipv6)
1846                 ret = inet_pton(AF_INET6, host, inp6);
1847         else
1848                 ret = inet_pton(AF_INET, host, inp);
1849
1850         if (ret != 1) {
1851                 struct addrinfo hints, *res;
1852
1853                 memset(&hints, 0, sizeof(hints));
1854                 hints.ai_family = ipv6 ? AF_INET6 : AF_INET;
1855                 hints.ai_socktype = SOCK_STREAM;
1856
1857                 ret = getaddrinfo(host, NULL, &hints, &res);
1858                 if (ret) {
1859                         log_err("fio: failed to resolve <%s> (%s)\n", host,
1860                                         gai_strerror(ret));
1861                         return 1;
1862                 }
1863
1864                 if (ipv6)
1865                         memcpy(inp6, &((struct sockaddr_in6 *) res->ai_addr)->sin6_addr, sizeof(*inp6));
1866                 else
1867                         memcpy(inp, &((struct sockaddr_in *) res->ai_addr)->sin_addr, sizeof(*inp));
1868
1869                 ret = 1;
1870                 freeaddrinfo(res);
1871         }
1872
1873         return !(ret == 1);
1874 }
1875
1876 /*
1877  * Parse a host/ip/port string. Reads from 'str'.
1878  *
1879  * Outputs:
1880  *
1881  * For IPv4:
1882  *      *ptr is the host, *port is the port, inp is the destination.
1883  * For IPv6:
1884  *      *ptr is the host, *port is the port, inp6 is the dest, and *ipv6 is 1.
1885  * For local domain sockets:
1886  *      *ptr is the filename, *is_sock is 1.
1887  */
1888 int fio_server_parse_string(const char *str, char **ptr, int *is_sock,
1889                             int *port, struct in_addr *inp,
1890                             struct in6_addr *inp6, int *ipv6)
1891 {
1892         const char *host = str;
1893         char *portp;
1894         int lport = 0;
1895
1896         *ptr = NULL;
1897         *is_sock = 0;
1898         *port = fio_net_port;
1899         *ipv6 = 0;
1900
1901         if (!strncmp(str, "sock:", 5)) {
1902                 *ptr = strdup(str + 5);
1903                 *is_sock = 1;
1904
1905                 return 0;
1906         }
1907
1908         /*
1909          * Is it ip:<ip or host>:port
1910          */
1911         if (!strncmp(host, "ip:", 3))
1912                 host += 3;
1913         else if (!strncmp(host, "ip4:", 4))
1914                 host += 4;
1915         else if (!strncmp(host, "ip6:", 4)) {
1916                 host += 4;
1917                 *ipv6 = 1;
1918         } else if (host[0] == ':') {
1919                 /* String is :port */
1920                 host++;
1921                 lport = atoi(host);
1922                 if (!lport || lport > 65535) {
1923                         log_err("fio: bad server port %u\n", lport);
1924                         return 1;
1925                 }
1926                 /* no hostname given, we are done */
1927                 *port = lport;
1928                 return 0;
1929         }
1930
1931         /*
1932          * If no port seen yet, check if there's a last ',' at the end
1933          */
1934         if (!lport) {
1935                 portp = strchr(host, ',');
1936                 if (portp) {
1937                         *portp = '\0';
1938                         portp++;
1939                         lport = atoi(portp);
1940                         if (!lport || lport > 65535) {
1941                                 log_err("fio: bad server port %u\n", lport);
1942                                 return 1;
1943                         }
1944                 }
1945         }
1946
1947         if (lport)
1948                 *port = lport;
1949
1950         if (!strlen(host))
1951                 return 0;
1952
1953         *ptr = strdup(host);
1954
1955         if (fio_server_parse_host(*ptr, *ipv6, inp, inp6)) {
1956                 free(*ptr);
1957                 *ptr = NULL;
1958                 return 1;
1959         }
1960
1961         if (*port == 0)
1962                 *port = fio_net_port;
1963
1964         return 0;
1965 }
1966
1967 /*
1968  * Server arg should be one of:
1969  *
1970  * sock:/path/to/socket
1971  *   ip:1.2.3.4
1972  *      1.2.3.4
1973  *
1974  * Where sock uses unix domain sockets, and ip binds the server to
1975  * a specific interface. If no arguments are given to the server, it
1976  * uses IP and binds to 0.0.0.0.
1977  *
1978  */
1979 static int fio_handle_server_arg(void)
1980 {
1981         int port = fio_net_port;
1982         int is_sock, ret = 0;
1983
1984         saddr_in.sin_addr.s_addr = htonl(INADDR_ANY);
1985
1986         if (!fio_server_arg)
1987                 goto out;
1988
1989         ret = fio_server_parse_string(fio_server_arg, &bind_sock, &is_sock,
1990                                         &port, &saddr_in.sin_addr,
1991                                         &saddr_in6.sin6_addr, &use_ipv6);
1992
1993         if (!is_sock && bind_sock) {
1994                 free(bind_sock);
1995                 bind_sock = NULL;
1996         }
1997
1998 out:
1999         fio_net_port = port;
2000         saddr_in.sin_port = htons(port);
2001         saddr_in6.sin6_port = htons(port);
2002         return ret;
2003 }
2004
2005 static void sig_int(int sig)
2006 {
2007         if (bind_sock)
2008                 unlink(bind_sock);
2009 }
2010
2011 static void set_sig_handlers(void)
2012 {
2013         struct sigaction act;
2014
2015         memset(&act, 0, sizeof(act));
2016         act.sa_handler = sig_int;
2017         act.sa_flags = SA_RESTART;
2018         sigaction(SIGINT, &act, NULL);
2019 }
2020
2021 static int fio_server(void)
2022 {
2023         int sk, ret;
2024
2025         dprint(FD_NET, "starting server\n");
2026
2027         if (fio_handle_server_arg())
2028                 return -1;
2029
2030         sk = fio_init_server_connection();
2031         if (sk < 0)
2032                 return -1;
2033
2034         set_sig_handlers();
2035
2036         if (pthread_key_create(&sk_out_key, NULL))
2037                 log_err("fio: can't create sk_out backend key\n");
2038
2039         ret = accept_loop(sk);
2040
2041         close(sk);
2042
2043         if (fio_server_arg) {
2044                 free(fio_server_arg);
2045                 fio_server_arg = NULL;
2046         }
2047         if (bind_sock)
2048                 free(bind_sock);
2049
2050         return ret;
2051 }
2052
2053 void fio_server_got_signal(int signal)
2054 {
2055         struct sk_out *sk_out = pthread_getspecific(sk_out_key);
2056
2057         assert(sk_out);
2058
2059         if (signal == SIGPIPE)
2060                 sk_out->sk = -1;
2061         else {
2062                 log_info("\nfio: terminating on signal %d\n", signal);
2063                 exit_backend = 1;
2064         }
2065 }
2066
2067 static int check_existing_pidfile(const char *pidfile)
2068 {
2069         struct stat sb;
2070         char buf[16];
2071         pid_t pid;
2072         FILE *f;
2073
2074         if (stat(pidfile, &sb))
2075                 return 0;
2076
2077         f = fopen(pidfile, "r");
2078         if (!f)
2079                 return 0;
2080
2081         if (fread(buf, sb.st_size, 1, f) <= 0) {
2082                 fclose(f);
2083                 return 1;
2084         }
2085         fclose(f);
2086
2087         pid = atoi(buf);
2088         if (kill(pid, SIGCONT) < 0)
2089                 return errno != ESRCH;
2090
2091         return 1;
2092 }
2093
2094 static int write_pid(pid_t pid, const char *pidfile)
2095 {
2096         FILE *fpid;
2097
2098         fpid = fopen(pidfile, "w");
2099         if (!fpid) {
2100                 log_err("fio: failed opening pid file %s\n", pidfile);
2101                 return 1;
2102         }
2103
2104         fprintf(fpid, "%u\n", (unsigned int) pid);
2105         fclose(fpid);
2106         return 0;
2107 }
2108
2109 /*
2110  * If pidfile is specified, background us.
2111  */
2112 int fio_start_server(char *pidfile)
2113 {
2114         pid_t pid;
2115         int ret;
2116
2117 #if defined(WIN32)
2118         WSADATA wsd;
2119         WSAStartup(MAKEWORD(2, 2), &wsd);
2120 #endif
2121
2122         if (!pidfile)
2123                 return fio_server();
2124
2125         if (check_existing_pidfile(pidfile)) {
2126                 log_err("fio: pidfile %s exists and server appears alive\n",
2127                                                                 pidfile);
2128                 free(pidfile);
2129                 return -1;
2130         }
2131
2132         pid = fork();
2133         if (pid < 0) {
2134                 log_err("fio: failed server fork: %s", strerror(errno));
2135                 free(pidfile);
2136                 return -1;
2137         } else if (pid) {
2138                 ret = write_pid(pid, pidfile);
2139                 free(pidfile);
2140                 _exit(ret);
2141         }
2142
2143         setsid();
2144         openlog("fio", LOG_NDELAY|LOG_NOWAIT|LOG_PID, LOG_USER);
2145         log_syslog = 1;
2146         close(STDIN_FILENO);
2147         close(STDOUT_FILENO);
2148         close(STDERR_FILENO);
2149         f_out = NULL;
2150         f_err = NULL;
2151
2152         ret = fio_server();
2153
2154         closelog();
2155         unlink(pidfile);
2156         free(pidfile);
2157         return ret;
2158 }
2159
2160 void fio_server_set_arg(const char *arg)
2161 {
2162         fio_server_arg = strdup(arg);
2163 }