client: suppress non JSON default outputs on --output-format=json/json+
[fio.git] / client.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <unistd.h>
4 #include <errno.h>
5 #include <fcntl.h>
6 #include <poll.h>
7 #include <sys/types.h>
8 #include <sys/stat.h>
9 #include <sys/socket.h>
10 #include <sys/un.h>
11 #include <netinet/in.h>
12 #include <arpa/inet.h>
13 #include <netdb.h>
14 #include <signal.h>
15 #ifdef CONFIG_ZLIB
16 #include <zlib.h>
17 #endif
18
19 #include "fio.h"
20 #include "client.h"
21 #include "server.h"
22 #include "flist.h"
23 #include "hash.h"
24 #include "verify-state.h"
25
26 static void handle_du(struct fio_client *client, struct fio_net_cmd *cmd);
27 static void handle_ts(struct fio_client *client, struct fio_net_cmd *cmd);
28 static void handle_gs(struct fio_client *client, struct fio_net_cmd *cmd);
29 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd);
30 static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd);
31 static void handle_stop(struct fio_client *client);
32 static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd);
33
34 static void convert_text(struct fio_net_cmd *cmd);
35 static void client_display_thread_status(struct jobs_eta *je);
36
37 struct client_ops fio_client_ops = {
38         .text           = handle_text,
39         .disk_util      = handle_du,
40         .thread_status  = handle_ts,
41         .group_stats    = handle_gs,
42         .stop           = handle_stop,
43         .start          = handle_start,
44         .eta            = client_display_thread_status,
45         .probe          = handle_probe,
46         .eta_msec       = FIO_CLIENT_DEF_ETA_MSEC,
47         .client_type    = FIO_CLIENT_TYPE_CLI,
48 };
49
50 static struct timespec eta_ts;
51
52 static FLIST_HEAD(client_list);
53 static FLIST_HEAD(eta_list);
54
55 static FLIST_HEAD(arg_list);
56
57 struct thread_stat client_ts;
58 struct group_run_stats client_gs;
59 int sum_stat_clients;
60
61 static int sum_stat_nr;
62 static struct json_object *root = NULL;
63 static struct json_object *job_opt_object = NULL;
64 static struct json_array *clients_array = NULL;
65 static struct json_array *du_array = NULL;
66
67 static int error_clients;
68
69 #define FIO_CLIENT_HASH_BITS    7
70 #define FIO_CLIENT_HASH_SZ      (1 << FIO_CLIENT_HASH_BITS)
71 #define FIO_CLIENT_HASH_MASK    (FIO_CLIENT_HASH_SZ - 1)
72 static struct flist_head client_hash[FIO_CLIENT_HASH_SZ];
73
74 static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *, bool *);
75
76 static void fio_client_add_hash(struct fio_client *client)
77 {
78         int bucket = hash_long(client->fd, FIO_CLIENT_HASH_BITS);
79
80         bucket &= FIO_CLIENT_HASH_MASK;
81         flist_add(&client->hash_list, &client_hash[bucket]);
82 }
83
84 static void fio_client_remove_hash(struct fio_client *client)
85 {
86         if (!flist_empty(&client->hash_list))
87                 flist_del_init(&client->hash_list);
88 }
89
90 static void fio_init fio_client_hash_init(void)
91 {
92         int i;
93
94         for (i = 0; i < FIO_CLIENT_HASH_SZ; i++)
95                 INIT_FLIST_HEAD(&client_hash[i]);
96 }
97
98 static int read_data(int fd, void *data, size_t size)
99 {
100         ssize_t ret;
101
102         while (size) {
103                 ret = read(fd, data, size);
104                 if (ret < 0) {
105                         if (errno == EAGAIN || errno == EINTR)
106                                 continue;
107                         break;
108                 } else if (!ret)
109                         break;
110                 else {
111                         data += ret;
112                         size -= ret;
113                 }
114         }
115
116         if (size)
117                 return EAGAIN;
118
119         return 0;
120 }
121
122 static int read_ini_data(int fd, void *data, size_t size)
123 {
124         char *p = data;
125         int ret = 0;
126         FILE *fp;
127         int dupfd;
128
129         dupfd = dup(fd);
130         if (dupfd < 0)
131                 return errno;
132
133         fp = fdopen(dupfd, "r");
134         if (!fp) {
135                 ret = errno;
136                 close(dupfd);
137                 goto out;
138         }
139
140         while (1) {
141                 ssize_t len;
142                 char buf[OPT_LEN_MAX+1], *sub;
143
144                 if (!fgets(buf, sizeof(buf), fp)) {
145                         if (ferror(fp)) {
146                                 if (errno == EAGAIN || errno == EINTR)
147                                         continue;
148                                 ret = errno;
149                         }
150                         break;
151                 }
152
153                 sub = fio_option_dup_subs(buf);
154                 len = strlen(sub);
155                 if (len + 1 > size) {
156                         log_err("fio: no space left to read data\n");
157                         free(sub);
158                         ret = ENOSPC;
159                         break;
160                 }
161
162                 memcpy(p, sub, len);
163                 free(sub);
164                 p += len;
165                 *p = '\0';
166                 size -= len;
167         }
168
169         fclose(fp);
170 out:
171         return ret;
172 }
173
174 static void fio_client_json_init(void)
175 {
176         char time_buf[32];
177         time_t time_p;
178
179         if (!(output_format & FIO_OUTPUT_JSON))
180                 return;
181
182         time(&time_p);
183         os_ctime_r((const time_t *) &time_p, time_buf, sizeof(time_buf));
184         time_buf[strlen(time_buf) - 1] = '\0';
185
186         root = json_create_object();
187         json_object_add_value_string(root, "fio version", fio_version_string);
188         json_object_add_value_int(root, "timestamp", time_p);
189         json_object_add_value_string(root, "time", time_buf);
190
191         job_opt_object = json_create_object();
192         json_object_add_value_object(root, "global options", job_opt_object);
193         clients_array = json_create_array();
194         json_object_add_value_array(root, "client_stats", clients_array);
195         du_array = json_create_array();
196         json_object_add_value_array(root, "disk_util", du_array);
197 }
198
199 static void fio_client_json_fini(void)
200 {
201         if (!(output_format & FIO_OUTPUT_JSON))
202                 return;
203
204         log_info("\n");
205         json_print_object(root, NULL);
206         log_info("\n");
207         json_free_object(root);
208         root = NULL;
209         clients_array = NULL;
210         du_array = NULL;
211 }
212
213 static struct fio_client *find_client_by_fd(int fd)
214 {
215         int bucket = hash_long(fd, FIO_CLIENT_HASH_BITS) & FIO_CLIENT_HASH_MASK;
216         struct fio_client *client;
217         struct flist_head *entry;
218
219         flist_for_each(entry, &client_hash[bucket]) {
220                 client = flist_entry(entry, struct fio_client, hash_list);
221
222                 if (client->fd == fd) {
223                         client->refs++;
224                         return client;
225                 }
226         }
227
228         return NULL;
229 }
230
231 void fio_put_client(struct fio_client *client)
232 {
233         if (--client->refs)
234                 return;
235
236         free(client->hostname);
237         if (client->argv)
238                 free(client->argv);
239         if (client->name)
240                 free(client->name);
241         while (client->nr_files) {
242                 struct client_file *cf = &client->files[--client->nr_files];
243
244                 free(cf->file);
245         }
246         if (client->files)
247                 free(client->files);
248         if (client->opt_lists)
249                 free(client->opt_lists);
250
251         if (!client->did_stat)
252                 sum_stat_clients--;
253
254         if (client->error)
255                 error_clients++;
256
257         free(client);
258 }
259
260 static int fio_client_dec_jobs_eta(struct client_eta *eta, client_eta_op eta_fn)
261 {
262         if (!--eta->pending) {
263                 eta_fn(&eta->eta);
264                 free(eta);
265                 return 0;
266         }
267
268         return 1;
269 }
270
271 static void fio_drain_client_text(struct fio_client *client)
272 {
273         do {
274                 struct fio_net_cmd *cmd;
275
276                 cmd = fio_net_recv_cmd(client->fd, false);
277                 if (!cmd)
278                         break;
279
280                 if (cmd->opcode == FIO_NET_CMD_TEXT) {
281                         convert_text(cmd);
282                         client->ops->text(client, cmd);
283                 }
284
285                 free(cmd);
286         } while (1);
287 }
288
289 static void remove_client(struct fio_client *client)
290 {
291         assert(client->refs);
292
293         dprint(FD_NET, "client: removed <%s>\n", client->hostname);
294
295         fio_drain_client_text(client);
296
297         if (!flist_empty(&client->list))
298                 flist_del_init(&client->list);
299
300         fio_client_remove_hash(client);
301
302         if (!flist_empty(&client->eta_list)) {
303                 flist_del_init(&client->eta_list);
304                 fio_client_dec_jobs_eta(client->eta_in_flight, client->ops->eta);
305         }
306
307         close(client->fd);
308         client->fd = -1;
309
310         if (client->ops->removed)
311                 client->ops->removed(client);
312
313         nr_clients--;
314         fio_put_client(client);
315 }
316
317 struct fio_client *fio_get_client(struct fio_client *client)
318 {
319         client->refs++;
320         return client;
321 }
322
323 static void __fio_client_add_cmd_option(struct fio_client *client,
324                                         const char *opt)
325 {
326         int index;
327
328         index = client->argc++;
329         client->argv = realloc(client->argv, sizeof(char *) * client->argc);
330         client->argv[index] = strdup(opt);
331         dprint(FD_NET, "client: add cmd %d: %s\n", index, opt);
332 }
333
334 void fio_client_add_cmd_option(void *cookie, const char *opt)
335 {
336         struct fio_client *client = cookie;
337         struct flist_head *entry;
338
339         if (!client || !opt)
340                 return;
341
342         __fio_client_add_cmd_option(client, opt);
343
344         /*
345          * Duplicate arguments to shared client group
346          */
347         flist_for_each(entry, &arg_list) {
348                 client = flist_entry(entry, struct fio_client, arg_list);
349
350                 __fio_client_add_cmd_option(client, opt);
351         }
352 }
353
354 struct fio_client *fio_client_add_explicit(struct client_ops *ops,
355                                            const char *hostname, int type,
356                                            int port)
357 {
358         struct fio_client *client;
359
360         client = malloc(sizeof(*client));
361         memset(client, 0, sizeof(*client));
362
363         INIT_FLIST_HEAD(&client->list);
364         INIT_FLIST_HEAD(&client->hash_list);
365         INIT_FLIST_HEAD(&client->arg_list);
366         INIT_FLIST_HEAD(&client->eta_list);
367         INIT_FLIST_HEAD(&client->cmd_list);
368
369         client->hostname = strdup(hostname);
370
371         if (type == Fio_client_socket)
372                 client->is_sock = true;
373         else {
374                 int ipv6;
375
376                 ipv6 = type == Fio_client_ipv6;
377                 if (fio_server_parse_host(hostname, ipv6,
378                                                 &client->addr.sin_addr,
379                                                 &client->addr6.sin6_addr))
380                         goto err;
381
382                 client->port = port;
383         }
384
385         client->fd = -1;
386         client->ops = ops;
387         client->refs = 1;
388         client->type = ops->client_type;
389
390         __fio_client_add_cmd_option(client, "fio");
391
392         flist_add(&client->list, &client_list);
393         nr_clients++;
394         dprint(FD_NET, "client: added <%s>\n", client->hostname);
395         return client;
396 err:
397         free(client);
398         return NULL;
399 }
400
401 int fio_client_add_ini_file(void *cookie, const char *ini_file, bool remote)
402 {
403         struct fio_client *client = cookie;
404         struct client_file *cf;
405         size_t new_size;
406         void *new_files;
407
408         if (!client)
409                 return 1;
410
411         dprint(FD_NET, "client <%s>: add ini %s\n", client->hostname, ini_file);
412
413         new_size = (client->nr_files + 1) * sizeof(struct client_file);
414         new_files = realloc(client->files, new_size);
415         if (!new_files)
416                 return 1;
417
418         client->files = new_files;
419         cf = &client->files[client->nr_files];
420         cf->file = strdup(ini_file);
421         cf->remote = remote;
422         client->nr_files++;
423         return 0;
424 }
425
426 int fio_client_add(struct client_ops *ops, const char *hostname, void **cookie)
427 {
428         struct fio_client *existing = *cookie;
429         struct fio_client *client;
430
431         if (existing) {
432                 /*
433                  * We always add our "exec" name as the option, hence 1
434                  * means empty.
435                  */
436                 if (existing->argc == 1)
437                         flist_add_tail(&existing->arg_list, &arg_list);
438                 else {
439                         while (!flist_empty(&arg_list))
440                                 flist_del_init(arg_list.next);
441                 }
442         }
443
444         client = malloc(sizeof(*client));
445         memset(client, 0, sizeof(*client));
446
447         INIT_FLIST_HEAD(&client->list);
448         INIT_FLIST_HEAD(&client->hash_list);
449         INIT_FLIST_HEAD(&client->arg_list);
450         INIT_FLIST_HEAD(&client->eta_list);
451         INIT_FLIST_HEAD(&client->cmd_list);
452
453         if (fio_server_parse_string(hostname, &client->hostname,
454                                         &client->is_sock, &client->port,
455                                         &client->addr.sin_addr,
456                                         &client->addr6.sin6_addr,
457                                         &client->ipv6))
458                 return -1;
459
460         client->fd = -1;
461         client->ops = ops;
462         client->refs = 1;
463         client->type = ops->client_type;
464
465         __fio_client_add_cmd_option(client, "fio");
466
467         flist_add(&client->list, &client_list);
468         nr_clients++;
469         dprint(FD_NET, "client: added <%s>\n", client->hostname);
470         *cookie = client;
471         return 0;
472 }
473
474 static const char *server_name(struct fio_client *client, char *buf,
475                                size_t bufsize)
476 {
477         const char *from;
478
479         if (client->ipv6)
480                 from = inet_ntop(AF_INET6, (struct sockaddr *) &client->addr6.sin6_addr, buf, bufsize);
481         else if (client->is_sock)
482                 from = "sock";
483         else
484                 from = inet_ntop(AF_INET, (struct sockaddr *) &client->addr.sin_addr, buf, bufsize);
485
486         return from;
487 }
488
489 static void probe_client(struct fio_client *client)
490 {
491         struct cmd_client_probe_pdu pdu;
492         const char *sname;
493         uint64_t tag;
494         char buf[64];
495
496         dprint(FD_NET, "client: send probe\n");
497
498 #ifdef CONFIG_ZLIB
499         pdu.flags = __le64_to_cpu(FIO_PROBE_FLAG_ZLIB);
500 #else
501         pdu.flags = 0;
502 #endif
503
504         sname = server_name(client, buf, sizeof(buf));
505         memset(pdu.server, 0, sizeof(pdu.server));
506         strncpy((char *) pdu.server, sname, sizeof(pdu.server) - 1);
507
508         fio_net_send_cmd(client->fd, FIO_NET_CMD_PROBE, &pdu, sizeof(pdu), &tag, &client->cmd_list);
509 }
510
511 static int fio_client_connect_ip(struct fio_client *client)
512 {
513         struct sockaddr *addr;
514         socklen_t socklen;
515         int fd, domain;
516
517         if (client->ipv6) {
518                 client->addr6.sin6_family = AF_INET6;
519                 client->addr6.sin6_port = htons(client->port);
520                 domain = AF_INET6;
521                 addr = (struct sockaddr *) &client->addr6;
522                 socklen = sizeof(client->addr6);
523         } else {
524                 client->addr.sin_family = AF_INET;
525                 client->addr.sin_port = htons(client->port);
526                 domain = AF_INET;
527                 addr = (struct sockaddr *) &client->addr;
528                 socklen = sizeof(client->addr);
529         }
530
531         fd = socket(domain, SOCK_STREAM, 0);
532         if (fd < 0) {
533                 int ret = -errno;
534
535                 log_err("fio: socket: %s\n", strerror(errno));
536                 return ret;
537         }
538
539         if (connect(fd, addr, socklen) < 0) {
540                 int ret = -errno;
541
542                 log_err("fio: connect: %s\n", strerror(errno));
543                 log_err("fio: failed to connect to %s:%u\n", client->hostname,
544                                                                 client->port);
545                 close(fd);
546                 return ret;
547         }
548
549         return fd;
550 }
551
552 static int fio_client_connect_sock(struct fio_client *client)
553 {
554         struct sockaddr_un *addr = &client->addr_un;
555         socklen_t len;
556         int fd;
557
558         memset(addr, 0, sizeof(*addr));
559         addr->sun_family = AF_UNIX;
560         strncpy(addr->sun_path, client->hostname, sizeof(addr->sun_path) - 1);
561
562         fd = socket(AF_UNIX, SOCK_STREAM, 0);
563         if (fd < 0) {
564                 int ret = -errno;
565
566                 log_err("fio: socket: %s\n", strerror(errno));
567                 return ret;
568         }
569
570         len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
571         if (connect(fd, (struct sockaddr *) addr, len) < 0) {
572                 int ret = -errno;
573
574                 log_err("fio: connect; %s\n", strerror(errno));
575                 close(fd);
576                 return ret;
577         }
578
579         return fd;
580 }
581
582 int fio_client_connect(struct fio_client *client)
583 {
584         int fd;
585
586         dprint(FD_NET, "client: connect to host %s\n", client->hostname);
587
588         if (client->is_sock)
589                 fd = fio_client_connect_sock(client);
590         else
591                 fd = fio_client_connect_ip(client);
592
593         dprint(FD_NET, "client: %s connected %d\n", client->hostname, fd);
594
595         if (fd < 0)
596                 return fd;
597
598         client->fd = fd;
599         fio_client_add_hash(client);
600         client->state = Client_connected;
601
602         probe_client(client);
603         return 0;
604 }
605
606 int fio_client_terminate(struct fio_client *client)
607 {
608         return fio_net_send_quit(client->fd);
609 }
610
611 static void fio_clients_terminate(void)
612 {
613         struct flist_head *entry;
614         struct fio_client *client;
615
616         dprint(FD_NET, "client: terminate clients\n");
617
618         flist_for_each(entry, &client_list) {
619                 client = flist_entry(entry, struct fio_client, list);
620                 fio_client_terminate(client);
621         }
622 }
623
624 static void sig_int(int sig)
625 {
626         dprint(FD_NET, "client: got signal %d\n", sig);
627         fio_clients_terminate();
628 }
629
630 static void client_signal_handler(void)
631 {
632         struct sigaction act;
633
634         memset(&act, 0, sizeof(act));
635         act.sa_handler = sig_int;
636         act.sa_flags = SA_RESTART;
637         sigaction(SIGINT, &act, NULL);
638
639         memset(&act, 0, sizeof(act));
640         act.sa_handler = sig_int;
641         act.sa_flags = SA_RESTART;
642         sigaction(SIGTERM, &act, NULL);
643
644 /* Windows uses SIGBREAK as a quit signal from other applications */
645 #ifdef WIN32
646         memset(&act, 0, sizeof(act));
647         act.sa_handler = sig_int;
648         act.sa_flags = SA_RESTART;
649         sigaction(SIGBREAK, &act, NULL);
650 #endif
651
652         memset(&act, 0, sizeof(act));
653         act.sa_handler = sig_show_status;
654         act.sa_flags = SA_RESTART;
655         sigaction(SIGUSR1, &act, NULL);
656 }
657
658 static int send_client_cmd_line(struct fio_client *client)
659 {
660         struct cmd_single_line_pdu *cslp;
661         struct cmd_line_pdu *clp;
662         unsigned long offset;
663         unsigned int *lens;
664         void *pdu;
665         size_t mem;
666         int i, ret;
667
668         dprint(FD_NET, "client: send cmdline %d\n", client->argc);
669
670         lens = malloc(client->argc * sizeof(unsigned int));
671
672         /*
673          * Find out how much mem we need
674          */
675         for (i = 0, mem = 0; i < client->argc; i++) {
676                 lens[i] = strlen(client->argv[i]) + 1;
677                 mem += lens[i];
678         }
679
680         /*
681          * We need one cmd_line_pdu, and argc number of cmd_single_line_pdu
682          */
683         mem += sizeof(*clp) + (client->argc * sizeof(*cslp));
684
685         pdu = malloc(mem);
686         clp = pdu;
687         offset = sizeof(*clp);
688
689         for (i = 0; i < client->argc; i++) {
690                 uint16_t arg_len = lens[i];
691
692                 cslp = pdu + offset;
693                 strcpy((char *) cslp->text, client->argv[i]);
694                 cslp->len = cpu_to_le16(arg_len);
695                 offset += sizeof(*cslp) + arg_len;
696         }
697
698         free(lens);
699         clp->lines = cpu_to_le16(client->argc);
700         clp->client_type = __cpu_to_le16(client->type);
701         ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOBLINE, pdu, mem, NULL, NULL);
702         free(pdu);
703         return ret;
704 }
705
706 int fio_clients_connect(void)
707 {
708         struct fio_client *client;
709         struct flist_head *entry, *tmp;
710         int ret;
711
712 #ifdef WIN32
713         WSADATA wsd;
714         WSAStartup(MAKEWORD(2, 2), &wsd);
715 #endif
716
717         dprint(FD_NET, "client: connect all\n");
718
719         client_signal_handler();
720
721         flist_for_each_safe(entry, tmp, &client_list) {
722                 client = flist_entry(entry, struct fio_client, list);
723
724                 ret = fio_client_connect(client);
725                 if (ret) {
726                         remove_client(client);
727                         continue;
728                 }
729
730                 if (client->argc > 1)
731                         send_client_cmd_line(client);
732         }
733
734         return !nr_clients;
735 }
736
737 int fio_start_client(struct fio_client *client)
738 {
739         dprint(FD_NET, "client: start %s\n", client->hostname);
740         return fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_RUN, 0, NULL);
741 }
742
743 int fio_start_all_clients(void)
744 {
745         struct fio_client *client;
746         struct flist_head *entry, *tmp;
747         int ret;
748
749         dprint(FD_NET, "client: start all\n");
750
751         fio_client_json_init();
752
753         flist_for_each_safe(entry, tmp, &client_list) {
754                 client = flist_entry(entry, struct fio_client, list);
755
756                 ret = fio_start_client(client);
757                 if (ret) {
758                         remove_client(client);
759                         continue;
760                 }
761         }
762
763         return flist_empty(&client_list);
764 }
765
766 static int __fio_client_send_remote_ini(struct fio_client *client,
767                                         const char *filename)
768 {
769         struct cmd_load_file_pdu *pdu;
770         size_t p_size;
771         int ret;
772
773         dprint(FD_NET, "send remote ini %s to %s\n", filename, client->hostname);
774
775         p_size = sizeof(*pdu) + strlen(filename) + 1;
776         pdu = malloc(p_size);
777         memset(pdu, 0, p_size);
778         pdu->name_len = strlen(filename);
779         strcpy((char *) pdu->file, filename);
780         pdu->client_type = cpu_to_le16((uint16_t) client->type);
781
782         client->sent_job = true;
783         ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_LOAD_FILE, pdu, p_size,NULL, NULL);
784         free(pdu);
785         return ret;
786 }
787
788 /*
789  * Send file contents to server backend. We could use sendfile(), but to remain
790  * more portable lets just read/write the darn thing.
791  */
792 static int __fio_client_send_local_ini(struct fio_client *client,
793                                        const char *filename)
794 {
795         struct cmd_job_pdu *pdu;
796         size_t p_size;
797         struct stat sb;
798         char *p;
799         void *buf;
800         off_t len;
801         int fd, ret;
802
803         dprint(FD_NET, "send ini %s to %s\n", filename, client->hostname);
804
805         fd = open(filename, O_RDONLY);
806         if (fd < 0) {
807                 ret = -errno;
808                 log_err("fio: job file <%s> open: %s\n", filename, strerror(errno));
809                 return ret;
810         }
811
812         if (fstat(fd, &sb) < 0) {
813                 ret = -errno;
814                 log_err("fio: job file stat: %s\n", strerror(errno));
815                 close(fd);
816                 return ret;
817         }
818
819         /*
820          * Add extra space for variable expansion, but doesn't guarantee.
821          */
822         sb.st_size += OPT_LEN_MAX;
823         p_size = sb.st_size + sizeof(*pdu);
824         pdu = malloc(p_size);
825         buf = pdu->buf;
826
827         len = sb.st_size;
828         p = buf;
829         if (read_ini_data(fd, p, len)) {
830                 log_err("fio: failed reading job file %s\n", filename);
831                 close(fd);
832                 free(pdu);
833                 return 1;
834         }
835
836         pdu->buf_len = __cpu_to_le32(sb.st_size);
837         pdu->client_type = cpu_to_le32(client->type);
838
839         client->sent_job = true;
840         ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOB, pdu, p_size, NULL, NULL);
841         free(pdu);
842         close(fd);
843         return ret;
844 }
845
846 int fio_client_send_ini(struct fio_client *client, const char *filename,
847                         bool remote)
848 {
849         int ret;
850
851         if (!remote)
852                 ret = __fio_client_send_local_ini(client, filename);
853         else
854                 ret = __fio_client_send_remote_ini(client, filename);
855
856         if (!ret)
857                 client->sent_job = true;
858
859         return ret;
860 }
861
862 static int fio_client_send_cf(struct fio_client *client,
863                               struct client_file *cf)
864 {
865         return fio_client_send_ini(client, cf->file, cf->remote);
866 }
867
868 int fio_clients_send_ini(const char *filename)
869 {
870         struct fio_client *client;
871         struct flist_head *entry, *tmp;
872
873         flist_for_each_safe(entry, tmp, &client_list) {
874                 bool failed = false;
875
876                 client = flist_entry(entry, struct fio_client, list);
877
878                 if (client->nr_files) {
879                         int i;
880
881                         for (i = 0; i < client->nr_files; i++) {
882                                 struct client_file *cf;
883
884                                 cf = &client->files[i];
885
886                                 if (fio_client_send_cf(client, cf)) {
887                                         failed = true;
888                                         remove_client(client);
889                                         break;
890                                 }
891                         }
892                 }
893                 if (client->sent_job || failed)
894                         continue;
895                 if (!filename || fio_client_send_ini(client, filename, 0))
896                         remove_client(client);
897         }
898
899         return !nr_clients;
900 }
901
902 int fio_client_update_options(struct fio_client *client,
903                               struct thread_options *o, uint64_t *tag)
904 {
905         struct cmd_add_job_pdu pdu;
906
907         pdu.thread_number = cpu_to_le32(client->thread_number);
908         pdu.groupid = cpu_to_le32(client->groupid);
909         convert_thread_options_to_net(&pdu.top, o);
910
911         return fio_net_send_cmd(client->fd, FIO_NET_CMD_UPDATE_JOB, &pdu, sizeof(pdu), tag, &client->cmd_list);
912 }
913
914 static void convert_io_stat(struct io_stat *dst, struct io_stat *src)
915 {
916         dst->max_val    = le64_to_cpu(src->max_val);
917         dst->min_val    = le64_to_cpu(src->min_val);
918         dst->samples    = le64_to_cpu(src->samples);
919
920         /*
921          * Floats arrive as IEEE 754 encoded uint64_t, convert back to double
922          */
923         dst->mean.u.f   = fio_uint64_to_double(le64_to_cpu(dst->mean.u.i));
924         dst->S.u.f      = fio_uint64_to_double(le64_to_cpu(dst->S.u.i));
925 }
926
927 static void convert_ts(struct thread_stat *dst, struct thread_stat *src)
928 {
929         int i, j;
930
931         dst->error              = le32_to_cpu(src->error);
932         dst->thread_number      = le32_to_cpu(src->thread_number);
933         dst->groupid            = le32_to_cpu(src->groupid);
934         dst->pid                = le32_to_cpu(src->pid);
935         dst->members            = le32_to_cpu(src->members);
936         dst->unified_rw_rep     = le32_to_cpu(src->unified_rw_rep);
937
938         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
939                 convert_io_stat(&dst->clat_stat[i], &src->clat_stat[i]);
940                 convert_io_stat(&dst->slat_stat[i], &src->slat_stat[i]);
941                 convert_io_stat(&dst->lat_stat[i], &src->lat_stat[i]);
942                 convert_io_stat(&dst->bw_stat[i], &src->bw_stat[i]);
943                 convert_io_stat(&dst->iops_stat[i], &src->iops_stat[i]);
944         }
945
946         dst->usr_time           = le64_to_cpu(src->usr_time);
947         dst->sys_time           = le64_to_cpu(src->sys_time);
948         dst->ctx                = le64_to_cpu(src->ctx);
949         dst->minf               = le64_to_cpu(src->minf);
950         dst->majf               = le64_to_cpu(src->majf);
951         dst->clat_percentiles   = le32_to_cpu(src->clat_percentiles);
952         dst->lat_percentiles    = le32_to_cpu(src->lat_percentiles);
953         dst->percentile_precision = le64_to_cpu(src->percentile_precision);
954
955         for (i = 0; i < FIO_IO_U_LIST_MAX_LEN; i++) {
956                 fio_fp64_t *fps = &src->percentile_list[i];
957                 fio_fp64_t *fpd = &dst->percentile_list[i];
958
959                 fpd->u.f = fio_uint64_to_double(le64_to_cpu(fps->u.i));
960         }
961
962         for (i = 0; i < FIO_IO_U_MAP_NR; i++) {
963                 dst->io_u_map[i]        = le64_to_cpu(src->io_u_map[i]);
964                 dst->io_u_submit[i]     = le64_to_cpu(src->io_u_submit[i]);
965                 dst->io_u_complete[i]   = le64_to_cpu(src->io_u_complete[i]);
966         }
967
968         for (i = 0; i < FIO_IO_U_LAT_N_NR; i++)
969                 dst->io_u_lat_n[i]      = le64_to_cpu(src->io_u_lat_n[i]);
970         for (i = 0; i < FIO_IO_U_LAT_U_NR; i++)
971                 dst->io_u_lat_u[i]      = le64_to_cpu(src->io_u_lat_u[i]);
972         for (i = 0; i < FIO_IO_U_LAT_M_NR; i++)
973                 dst->io_u_lat_m[i]      = le64_to_cpu(src->io_u_lat_m[i]);
974
975         for (i = 0; i < DDIR_RWDIR_CNT; i++)
976                 for (j = 0; j < FIO_IO_U_PLAT_NR; j++)
977                         dst->io_u_plat[i][j] = le64_to_cpu(src->io_u_plat[i][j]);
978
979         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
980                 dst->total_io_u[i]      = le64_to_cpu(src->total_io_u[i]);
981                 dst->short_io_u[i]      = le64_to_cpu(src->short_io_u[i]);
982                 dst->drop_io_u[i]       = le64_to_cpu(src->drop_io_u[i]);
983         }
984
985         dst->total_submit       = le64_to_cpu(src->total_submit);
986         dst->total_complete     = le64_to_cpu(src->total_complete);
987
988         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
989                 dst->io_bytes[i]        = le64_to_cpu(src->io_bytes[i]);
990                 dst->runtime[i]         = le64_to_cpu(src->runtime[i]);
991         }
992
993         dst->total_run_time     = le64_to_cpu(src->total_run_time);
994         dst->continue_on_error  = le16_to_cpu(src->continue_on_error);
995         dst->total_err_count    = le64_to_cpu(src->total_err_count);
996         dst->first_error        = le32_to_cpu(src->first_error);
997         dst->kb_base            = le32_to_cpu(src->kb_base);
998         dst->unit_base          = le32_to_cpu(src->unit_base);
999
1000         dst->sig_figs           = le32_to_cpu(src->sig_figs);
1001
1002         dst->latency_depth      = le32_to_cpu(src->latency_depth);
1003         dst->latency_target     = le64_to_cpu(src->latency_target);
1004         dst->latency_window     = le64_to_cpu(src->latency_window);
1005         dst->latency_percentile.u.f = fio_uint64_to_double(le64_to_cpu(src->latency_percentile.u.i));
1006
1007         dst->nr_block_infos     = le64_to_cpu(src->nr_block_infos);
1008         for (i = 0; i < dst->nr_block_infos; i++)
1009                 dst->block_infos[i] = le32_to_cpu(src->block_infos[i]);
1010
1011         dst->ss_dur             = le64_to_cpu(src->ss_dur);
1012         dst->ss_state           = le32_to_cpu(src->ss_state);
1013         dst->ss_head            = le32_to_cpu(src->ss_head);
1014         dst->ss_limit.u.f       = fio_uint64_to_double(le64_to_cpu(src->ss_limit.u.i));
1015         dst->ss_slope.u.f       = fio_uint64_to_double(le64_to_cpu(src->ss_slope.u.i));
1016         dst->ss_deviation.u.f   = fio_uint64_to_double(le64_to_cpu(src->ss_deviation.u.i));
1017         dst->ss_criterion.u.f   = fio_uint64_to_double(le64_to_cpu(src->ss_criterion.u.i));
1018
1019         if (dst->ss_state & FIO_SS_DATA) {
1020                 for (i = 0; i < dst->ss_dur; i++ ) {
1021                         dst->ss_iops_data[i] = le64_to_cpu(src->ss_iops_data[i]);
1022                         dst->ss_bw_data[i] = le64_to_cpu(src->ss_bw_data[i]);
1023                 }
1024         }
1025 }
1026
1027 static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src)
1028 {
1029         int i;
1030
1031         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
1032                 dst->max_run[i]         = le64_to_cpu(src->max_run[i]);
1033                 dst->min_run[i]         = le64_to_cpu(src->min_run[i]);
1034                 dst->max_bw[i]          = le64_to_cpu(src->max_bw[i]);
1035                 dst->min_bw[i]          = le64_to_cpu(src->min_bw[i]);
1036                 dst->iobytes[i]         = le64_to_cpu(src->iobytes[i]);
1037                 dst->agg[i]             = le64_to_cpu(src->agg[i]);
1038         }
1039
1040         dst->kb_base    = le32_to_cpu(src->kb_base);
1041         dst->unit_base  = le32_to_cpu(src->unit_base);
1042         dst->sig_figs   = le32_to_cpu(src->sig_figs);
1043         dst->groupid    = le32_to_cpu(src->groupid);
1044         dst->unified_rw_rep     = le32_to_cpu(src->unified_rw_rep);
1045 }
1046
1047 static void json_object_add_client_info(struct json_object *obj,
1048                                         struct fio_client *client)
1049 {
1050         const char *hostname = client->hostname ? client->hostname : "";
1051
1052         json_object_add_value_string(obj, "hostname", hostname);
1053         json_object_add_value_int(obj, "port", client->port);
1054 }
1055
1056 static void handle_ts(struct fio_client *client, struct fio_net_cmd *cmd)
1057 {
1058         struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload;
1059         struct flist_head *opt_list = NULL;
1060         struct json_object *tsobj;
1061
1062         if (output_format & FIO_OUTPUT_TERSE)
1063                 return;
1064
1065         if (client->opt_lists && p->ts.thread_number <= client->jobs)
1066                 opt_list = &client->opt_lists[p->ts.thread_number - 1];
1067
1068         tsobj = show_thread_status(&p->ts, &p->rs, opt_list, NULL);
1069         client->did_stat = true;
1070         if (tsobj) {
1071                 json_object_add_client_info(tsobj, client);
1072                 json_array_add_value_object(clients_array, tsobj);
1073         }
1074
1075         if (sum_stat_clients <= 1)
1076                 return;
1077
1078         sum_thread_stats(&client_ts, &p->ts, sum_stat_nr == 1);
1079         sum_group_stats(&client_gs, &p->rs);
1080
1081         client_ts.members++;
1082         client_ts.thread_number = p->ts.thread_number;
1083         client_ts.groupid = p->ts.groupid;
1084         client_ts.unified_rw_rep = p->ts.unified_rw_rep;
1085         client_ts.sig_figs = p->ts.sig_figs;
1086
1087         if (++sum_stat_nr == sum_stat_clients) {
1088                 strcpy(client_ts.name, "All clients");
1089                 tsobj = show_thread_status(&client_ts, &client_gs, NULL, NULL);
1090                 if (tsobj) {
1091                         json_object_add_client_info(tsobj, client);
1092                         json_array_add_value_object(clients_array, tsobj);
1093                 }
1094         }
1095 }
1096
1097 static void handle_gs(struct fio_client *client, struct fio_net_cmd *cmd)
1098 {
1099         struct group_run_stats *gs = (struct group_run_stats *) cmd->payload;
1100
1101         if (output_format & FIO_OUTPUT_TERSE)
1102                 return;
1103
1104         if (output_format & FIO_OUTPUT_NORMAL)
1105                 show_group_stats(gs, NULL);
1106 }
1107
1108 static void handle_job_opt(struct fio_client *client, struct fio_net_cmd *cmd)
1109 {
1110         struct cmd_job_option *pdu = (struct cmd_job_option *) cmd->payload;
1111         struct print_option *p;
1112
1113         if (!job_opt_object)
1114                 return;
1115
1116         pdu->global = le16_to_cpu(pdu->global);
1117         pdu->truncated = le16_to_cpu(pdu->truncated);
1118         pdu->groupid = le32_to_cpu(pdu->groupid);
1119
1120         p = malloc(sizeof(*p));
1121         p->name = strdup((char *) pdu->name);
1122         if (pdu->value[0] != '\0')
1123                 p->value = strdup((char *) pdu->value);
1124         else
1125                 p->value = NULL;
1126
1127         if (pdu->global) {
1128                 const char *pos = "";
1129
1130                 if (p->value)
1131                         pos = p->value;
1132
1133                 json_object_add_value_string(job_opt_object, p->name, pos);
1134         } else if (client->opt_lists) {
1135                 struct flist_head *opt_list = &client->opt_lists[pdu->groupid];
1136
1137                 flist_add_tail(&p->list, opt_list);
1138         }
1139 }
1140
1141 static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd)
1142 {
1143         struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmd->payload;
1144         const char *buf = (const char *) pdu->buf;
1145         const char *name;
1146         int fio_unused ret;
1147
1148         name = client->name ? client->name : client->hostname;
1149
1150         if (!client->skip_newline && !(output_format & FIO_OUTPUT_TERSE))
1151                 fprintf(f_out, "<%s> ", name);
1152         ret = fwrite(buf, pdu->buf_len, 1, f_out);
1153         fflush(f_out);
1154         client->skip_newline = strchr(buf, '\n') == NULL;
1155 }
1156
1157 static void convert_agg(struct disk_util_agg *agg)
1158 {
1159         int i;
1160
1161         for (i = 0; i < 2; i++) {
1162                 agg->ios[i]     = le64_to_cpu(agg->ios[i]);
1163                 agg->merges[i]  = le64_to_cpu(agg->merges[i]);
1164                 agg->sectors[i] = le64_to_cpu(agg->sectors[i]);
1165                 agg->ticks[i]   = le64_to_cpu(agg->ticks[i]);
1166         }
1167
1168         agg->io_ticks           = le64_to_cpu(agg->io_ticks);
1169         agg->time_in_queue      = le64_to_cpu(agg->time_in_queue);
1170         agg->slavecount         = le32_to_cpu(agg->slavecount);
1171         agg->max_util.u.f       = fio_uint64_to_double(le64_to_cpu(agg->max_util.u.i));
1172 }
1173
1174 static void convert_dus(struct disk_util_stat *dus)
1175 {
1176         int i;
1177
1178         for (i = 0; i < 2; i++) {
1179                 dus->s.ios[i]           = le64_to_cpu(dus->s.ios[i]);
1180                 dus->s.merges[i]        = le64_to_cpu(dus->s.merges[i]);
1181                 dus->s.sectors[i]       = le64_to_cpu(dus->s.sectors[i]);
1182                 dus->s.ticks[i]         = le64_to_cpu(dus->s.ticks[i]);
1183         }
1184
1185         dus->s.io_ticks         = le64_to_cpu(dus->s.io_ticks);
1186         dus->s.time_in_queue    = le64_to_cpu(dus->s.time_in_queue);
1187         dus->s.msec             = le64_to_cpu(dus->s.msec);
1188 }
1189
1190 static void handle_du(struct fio_client *client, struct fio_net_cmd *cmd)
1191 {
1192         struct cmd_du_pdu *du = (struct cmd_du_pdu *) cmd->payload;
1193
1194         if (output_format & FIO_OUTPUT_TERSE)
1195                 return;
1196
1197         if (!client->disk_stats_shown) {
1198                 client->disk_stats_shown = true;
1199                 if (!(output_format & FIO_OUTPUT_JSON))
1200                         log_info("\nDisk stats (read/write):\n");
1201         }
1202
1203         if (output_format & FIO_OUTPUT_JSON) {
1204                 struct json_object *duobj;
1205                 json_array_add_disk_util(&du->dus, &du->agg, du_array);
1206                 duobj = json_array_last_value_object(du_array);
1207                 json_object_add_client_info(duobj, client);
1208         }
1209         if (output_format & FIO_OUTPUT_NORMAL)
1210                 print_disk_util(&du->dus, &du->agg, 0, NULL);
1211 }
1212
1213 static void convert_jobs_eta(struct jobs_eta *je)
1214 {
1215         int i;
1216
1217         je->nr_running          = le32_to_cpu(je->nr_running);
1218         je->nr_ramp             = le32_to_cpu(je->nr_ramp);
1219         je->nr_pending          = le32_to_cpu(je->nr_pending);
1220         je->nr_setting_up       = le32_to_cpu(je->nr_setting_up);
1221         je->files_open          = le32_to_cpu(je->files_open);
1222
1223         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
1224                 je->m_rate[i]   = le64_to_cpu(je->m_rate[i]);
1225                 je->t_rate[i]   = le64_to_cpu(je->t_rate[i]);
1226                 je->m_iops[i]   = le32_to_cpu(je->m_iops[i]);
1227                 je->t_iops[i]   = le32_to_cpu(je->t_iops[i]);
1228                 je->rate[i]     = le64_to_cpu(je->rate[i]);
1229                 je->iops[i]     = le32_to_cpu(je->iops[i]);
1230         }
1231
1232         je->elapsed_sec         = le64_to_cpu(je->elapsed_sec);
1233         je->eta_sec             = le64_to_cpu(je->eta_sec);
1234         je->nr_threads          = le32_to_cpu(je->nr_threads);
1235         je->is_pow2             = le32_to_cpu(je->is_pow2);
1236         je->unit_base           = le32_to_cpu(je->unit_base);
1237         je->sig_figs            = le32_to_cpu(je->sig_figs);
1238 }
1239
1240 void fio_client_sum_jobs_eta(struct jobs_eta *dst, struct jobs_eta *je)
1241 {
1242         int i;
1243
1244         dst->nr_running         += je->nr_running;
1245         dst->nr_ramp            += je->nr_ramp;
1246         dst->nr_pending         += je->nr_pending;
1247         dst->nr_setting_up      += je->nr_setting_up;
1248         dst->files_open         += je->files_open;
1249
1250         for (i = 0; i < DDIR_RWDIR_CNT; i++) {
1251                 dst->m_rate[i]  += je->m_rate[i];
1252                 dst->t_rate[i]  += je->t_rate[i];
1253                 dst->m_iops[i]  += je->m_iops[i];
1254                 dst->t_iops[i]  += je->t_iops[i];
1255                 dst->rate[i]    += je->rate[i];
1256                 dst->iops[i]    += je->iops[i];
1257         }
1258
1259         dst->elapsed_sec        += je->elapsed_sec;
1260
1261         if (je->eta_sec > dst->eta_sec)
1262                 dst->eta_sec = je->eta_sec;
1263
1264         dst->nr_threads         += je->nr_threads;
1265
1266         /*
1267          * This wont be correct for multiple strings, but at least it
1268          * works for the basic cases.
1269          */
1270         strcpy((char *) dst->run_str, (char *) je->run_str);
1271 }
1272
1273 static bool remove_reply_cmd(struct fio_client *client, struct fio_net_cmd *cmd)
1274 {
1275         struct fio_net_cmd_reply *reply = NULL;
1276         struct flist_head *entry;
1277
1278         flist_for_each(entry, &client->cmd_list) {
1279                 reply = flist_entry(entry, struct fio_net_cmd_reply, list);
1280
1281                 if (cmd->tag == (uintptr_t) reply)
1282                         break;
1283
1284                 reply = NULL;
1285         }
1286
1287         if (!reply) {
1288                 log_err("fio: client: unable to find matching tag (%llx)\n", (unsigned long long) cmd->tag);
1289                 return false;
1290         }
1291
1292         flist_del(&reply->list);
1293         cmd->tag = reply->saved_tag;
1294         free(reply);
1295         return true;
1296 }
1297
1298 int fio_client_wait_for_reply(struct fio_client *client, uint64_t tag)
1299 {
1300         do {
1301                 struct fio_net_cmd_reply *reply = NULL;
1302                 struct flist_head *entry;
1303
1304                 flist_for_each(entry, &client->cmd_list) {
1305                         reply = flist_entry(entry, struct fio_net_cmd_reply, list);
1306
1307                         if (tag == (uintptr_t) reply)
1308                                 break;
1309
1310                         reply = NULL;
1311                 }
1312
1313                 if (!reply)
1314                         break;
1315
1316                 usleep(1000);
1317         } while (1);
1318
1319         return 0;
1320 }
1321
1322 static void handle_eta(struct fio_client *client, struct fio_net_cmd *cmd)
1323 {
1324         struct jobs_eta *je = (struct jobs_eta *) cmd->payload;
1325         struct client_eta *eta = (struct client_eta *) (uintptr_t) cmd->tag;
1326
1327         dprint(FD_NET, "client: got eta tag %p, %d\n", eta, eta->pending);
1328
1329         assert(client->eta_in_flight == eta);
1330
1331         client->eta_in_flight = NULL;
1332         flist_del_init(&client->eta_list);
1333         client->eta_timeouts = 0;
1334
1335         if (client->ops->jobs_eta)
1336                 client->ops->jobs_eta(client, je);
1337
1338         fio_client_sum_jobs_eta(&eta->eta, je);
1339         fio_client_dec_jobs_eta(eta, client->ops->eta);
1340 }
1341
1342 static void client_flush_hist_samples(FILE *f, int hist_coarseness, void *samples,
1343                                       uint64_t sample_size)
1344 {
1345         struct io_sample *s;
1346         int log_offset;
1347         uint64_t i, j, nr_samples;
1348         struct io_u_plat_entry *entry;
1349         uint64_t *io_u_plat;
1350
1351         int stride = 1 << hist_coarseness;
1352
1353         if (!sample_size)
1354                 return;
1355
1356         s = __get_sample(samples, 0, 0);
1357         log_offset = (s->__ddir & LOG_OFFSET_SAMPLE_BIT) != 0;
1358
1359         nr_samples = sample_size / __log_entry_sz(log_offset);
1360
1361         for (i = 0; i < nr_samples; i++) {
1362
1363                 s = (struct io_sample *)((char *)__get_sample(samples, log_offset, i) +
1364                         i * sizeof(struct io_u_plat_entry));
1365
1366                 entry = s->data.plat_entry;
1367                 io_u_plat = entry->io_u_plat;
1368
1369                 fprintf(f, "%lu, %u, %llu, ", (unsigned long) s->time,
1370                                                 io_sample_ddir(s), (unsigned long long) s->bs);
1371                 for (j = 0; j < FIO_IO_U_PLAT_NR - stride; j += stride) {
1372                         fprintf(f, "%llu, ", (unsigned long long)hist_sum(j, stride, io_u_plat, NULL));
1373                 }
1374                 fprintf(f, "%llu\n", (unsigned long long)
1375                         hist_sum(FIO_IO_U_PLAT_NR - stride, stride, io_u_plat, NULL));
1376
1377         }
1378 }
1379
1380 static int fio_client_handle_iolog(struct fio_client *client,
1381                                    struct fio_net_cmd *cmd)
1382 {
1383         struct cmd_iolog_pdu *pdu = NULL;
1384         bool store_direct;
1385         char *log_pathname = NULL;
1386         int ret = 0;
1387
1388         pdu = convert_iolog(cmd, &store_direct);
1389         if (!pdu) {
1390                 log_err("fio: failed converting IO log\n");
1391                 ret = 1;
1392                 goto out;
1393         }
1394
1395         /* allocate buffer big enough for next sprintf() call */
1396         log_pathname = malloc(10 + strlen((char *)pdu->name) +
1397                         strlen(client->hostname));
1398         if (!log_pathname) {
1399                 log_err("fio: memory allocation of unique pathname failed\n");
1400                 ret = -1;
1401                 goto out;
1402         }
1403         /* generate a unique pathname for the log file using hostname */
1404         sprintf(log_pathname, "%s.%s", pdu->name, client->hostname);
1405
1406         if (store_direct) {
1407                 ssize_t wrote;
1408                 size_t sz;
1409                 int fd;
1410
1411                 fd = open((const char *) log_pathname,
1412                                 O_WRONLY | O_CREAT | O_TRUNC, 0644);
1413                 if (fd < 0) {
1414                         log_err("fio: open log %s: %s\n",
1415                                 log_pathname, strerror(errno));
1416                         ret = 1;
1417                         goto out;
1418                 }
1419
1420                 sz = cmd->pdu_len - sizeof(*pdu);
1421                 wrote = write(fd, pdu->samples, sz);
1422                 close(fd);
1423
1424                 if (wrote != sz) {
1425                         log_err("fio: short write on compressed log\n");
1426                         ret = 1;
1427                         goto out;
1428                 }
1429
1430                 ret = 0;
1431         } else {
1432                 FILE *f;
1433                 f = fopen((const char *) log_pathname, "w");
1434                 if (!f) {
1435                         log_err("fio: fopen log %s : %s\n",
1436                                 log_pathname, strerror(errno));
1437                         ret = 1;
1438                         goto out;
1439                 }
1440
1441                 if (pdu->log_type == IO_LOG_TYPE_HIST) {
1442                         client_flush_hist_samples(f, pdu->log_hist_coarseness, pdu->samples,
1443                                            pdu->nr_samples * sizeof(struct io_sample));
1444                 } else {
1445                         flush_samples(f, pdu->samples,
1446                                         pdu->nr_samples * sizeof(struct io_sample));
1447                 }
1448                 fclose(f);
1449                 ret = 0;
1450         }
1451
1452 out:
1453         if (pdu && pdu != (void *) cmd->payload)
1454                 free(pdu);
1455
1456         if (log_pathname)
1457                 free(log_pathname);
1458
1459         return ret;
1460 }
1461
1462 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd)
1463 {
1464         struct cmd_probe_reply_pdu *probe = (struct cmd_probe_reply_pdu *) cmd->payload;
1465         const char *os, *arch;
1466         char bit[16];
1467
1468         if (output_format & FIO_OUTPUT_TERSE)
1469                 return;
1470
1471         os = fio_get_os_string(probe->os);
1472         if (!os)
1473                 os = "unknown";
1474
1475         arch = fio_get_arch_string(probe->arch);
1476         if (!arch)
1477                 os = "unknown";
1478
1479         sprintf(bit, "%d-bit", probe->bpp * 8);
1480         probe->flags = le64_to_cpu(probe->flags);
1481
1482         if (!(output_format & FIO_OUTPUT_JSON))
1483                 log_info("hostname=%s, be=%u, %s, os=%s, arch=%s, fio=%s, flags=%lx\n",
1484                         probe->hostname, probe->bigendian, bit, os, arch,
1485                         probe->fio_version, (unsigned long) probe->flags);
1486
1487         if (!client->name)
1488                 client->name = strdup((char *) probe->hostname);
1489 }
1490
1491 static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd)
1492 {
1493         struct cmd_start_pdu *pdu = (struct cmd_start_pdu *) cmd->payload;
1494
1495         client->state = Client_started;
1496         client->jobs = le32_to_cpu(pdu->jobs);
1497         client->nr_stat = le32_to_cpu(pdu->stat_outputs);
1498
1499         if (client->jobs) {
1500                 int i;
1501
1502                 if (client->opt_lists)
1503                         free(client->opt_lists);
1504
1505                 client->opt_lists = malloc(client->jobs * sizeof(struct flist_head));
1506                 for (i = 0; i < client->jobs; i++)
1507                         INIT_FLIST_HEAD(&client->opt_lists[i]);
1508         }
1509
1510         sum_stat_clients += client->nr_stat;
1511 }
1512
1513 static void handle_stop(struct fio_client *client)
1514 {
1515         if (client->error)
1516                 log_info("client <%s>: exited with error %d\n", client->hostname, client->error);
1517 }
1518
1519 static void convert_stop(struct fio_net_cmd *cmd)
1520 {
1521         struct cmd_end_pdu *pdu = (struct cmd_end_pdu *) cmd->payload;
1522
1523         pdu->error = le32_to_cpu(pdu->error);
1524 }
1525
1526 static void convert_text(struct fio_net_cmd *cmd)
1527 {
1528         struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmd->payload;
1529
1530         pdu->level      = le32_to_cpu(pdu->level);
1531         pdu->buf_len    = le32_to_cpu(pdu->buf_len);
1532         pdu->log_sec    = le64_to_cpu(pdu->log_sec);
1533         pdu->log_usec   = le64_to_cpu(pdu->log_usec);
1534 }
1535
1536 static struct cmd_iolog_pdu *convert_iolog_gz(struct fio_net_cmd *cmd,
1537                                               struct cmd_iolog_pdu *pdu)
1538 {
1539 #ifdef CONFIG_ZLIB
1540         struct cmd_iolog_pdu *ret;
1541         z_stream stream;
1542         uint32_t nr_samples;
1543         size_t total;
1544         char *p;
1545
1546         stream.zalloc = Z_NULL;
1547         stream.zfree = Z_NULL;
1548         stream.opaque = Z_NULL;
1549         stream.avail_in = 0;
1550         stream.next_in = Z_NULL;
1551
1552         if (inflateInit(&stream) != Z_OK)
1553                 return NULL;
1554
1555         /*
1556          * Get header first, it's not compressed
1557          */
1558         nr_samples = le64_to_cpu(pdu->nr_samples);
1559
1560         if (pdu->log_type == IO_LOG_TYPE_HIST)
1561                 total = nr_samples * (__log_entry_sz(le32_to_cpu(pdu->log_offset)) +
1562                                         sizeof(struct io_u_plat_entry));
1563         else
1564                 total = nr_samples * __log_entry_sz(le32_to_cpu(pdu->log_offset));
1565         ret = malloc(total + sizeof(*pdu));
1566         ret->nr_samples = nr_samples;
1567
1568         memcpy(ret, pdu, sizeof(*pdu));
1569
1570         p = (char *) ret + sizeof(*pdu);
1571
1572         stream.avail_in = cmd->pdu_len - sizeof(*pdu);
1573         stream.next_in = (void *)((char *) pdu + sizeof(*pdu));
1574         while (stream.avail_in) {
1575                 unsigned int this_chunk = 65536;
1576                 unsigned int this_len;
1577                 int err;
1578
1579                 if (this_chunk > total)
1580                         this_chunk = total;
1581
1582                 stream.avail_out = this_chunk;
1583                 stream.next_out = (void *)p;
1584                 err = inflate(&stream, Z_NO_FLUSH);
1585                 /* may be Z_OK, or Z_STREAM_END */
1586                 if (err < 0) {
1587                         log_err("fio: inflate error %d\n", err);
1588                         free(ret);
1589                         ret = NULL;
1590                         goto err;
1591                 }
1592
1593                 this_len = this_chunk - stream.avail_out;
1594                 p += this_len;
1595                 total -= this_len;
1596         }
1597
1598 err:
1599         inflateEnd(&stream);
1600         return ret;
1601 #else
1602         return NULL;
1603 #endif
1604 }
1605
1606 /*
1607  * This has been compressed on the server side, since it can be big.
1608  * Uncompress here.
1609  */
1610 static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *cmd,
1611                                            bool *store_direct)
1612 {
1613         struct cmd_iolog_pdu *pdu = (struct cmd_iolog_pdu *) cmd->payload;
1614         struct cmd_iolog_pdu *ret;
1615         uint64_t i;
1616         int compressed;
1617         void *samples;
1618
1619         *store_direct = false;
1620
1621         /*
1622          * Convert if compressed and we support it. If it's not
1623          * compressed, we need not do anything.
1624          */
1625         compressed = le32_to_cpu(pdu->compressed);
1626         if (compressed == XMIT_COMPRESSED) {
1627 #ifndef CONFIG_ZLIB
1628                 log_err("fio: server sent compressed data by mistake\n");
1629                 return NULL;
1630 #endif
1631                 ret = convert_iolog_gz(cmd, pdu);
1632                 if (!ret) {
1633                         log_err("fio: failed decompressing log\n");
1634                         return NULL;
1635                 }
1636         } else if (compressed == STORE_COMPRESSED) {
1637                 *store_direct = true;
1638                 ret = pdu;
1639         } else
1640                 ret = pdu;
1641
1642         ret->nr_samples         = le64_to_cpu(ret->nr_samples);
1643         ret->thread_number      = le32_to_cpu(ret->thread_number);
1644         ret->log_type           = le32_to_cpu(ret->log_type);
1645         ret->compressed         = le32_to_cpu(ret->compressed);
1646         ret->log_offset         = le32_to_cpu(ret->log_offset);
1647         ret->log_hist_coarseness = le32_to_cpu(ret->log_hist_coarseness);
1648
1649         if (*store_direct)
1650                 return ret;
1651
1652         samples = &ret->samples[0];
1653         for (i = 0; i < ret->nr_samples; i++) {
1654                 struct io_sample *s;
1655
1656                 s = __get_sample(samples, ret->log_offset, i);
1657                 if (ret->log_type == IO_LOG_TYPE_HIST)
1658                         s = (struct io_sample *)((char *)s + sizeof(struct io_u_plat_entry) * i);
1659
1660                 s->time         = le64_to_cpu(s->time);
1661                 s->data.val     = le64_to_cpu(s->data.val);
1662                 s->__ddir       = le32_to_cpu(s->__ddir);
1663                 s->bs           = le64_to_cpu(s->bs);
1664
1665                 if (ret->log_offset) {
1666                         struct io_sample_offset *so = (void *) s;
1667
1668                         so->offset = le64_to_cpu(so->offset);
1669                 }
1670
1671                 if (ret->log_type == IO_LOG_TYPE_HIST) {
1672                         s->data.plat_entry = (struct io_u_plat_entry *)(((char *)s) + sizeof(*s));
1673                         s->data.plat_entry->list.next = NULL;
1674                         s->data.plat_entry->list.prev = NULL;
1675                 }
1676         }
1677
1678         return ret;
1679 }
1680
1681 static void sendfile_reply(int fd, struct cmd_sendfile_reply *rep,
1682                            size_t size, uint64_t tag)
1683 {
1684         rep->error = cpu_to_le32(rep->error);
1685         fio_net_send_cmd(fd, FIO_NET_CMD_SENDFILE, rep, size, &tag, NULL);
1686 }
1687
1688 static int fio_send_file(struct fio_client *client, struct cmd_sendfile *pdu,
1689                          uint64_t tag)
1690 {
1691         struct cmd_sendfile_reply *rep;
1692         struct stat sb;
1693         size_t size;
1694         int fd;
1695
1696         size = sizeof(*rep);
1697         rep = malloc(size);
1698
1699         if (stat((char *)pdu->path, &sb) < 0) {
1700 fail:
1701                 rep->error = errno;
1702                 sendfile_reply(client->fd, rep, size, tag);
1703                 free(rep);
1704                 return 1;
1705         }
1706
1707         size += sb.st_size;
1708         rep = realloc(rep, size);
1709         rep->size = cpu_to_le32((uint32_t) sb.st_size);
1710
1711         fd = open((char *)pdu->path, O_RDONLY);
1712         if (fd == -1 )
1713                 goto fail;
1714
1715         rep->error = read_data(fd, &rep->data, sb.st_size);
1716         sendfile_reply(client->fd, rep, size, tag);
1717         free(rep);
1718         close(fd);
1719         return 0;
1720 }
1721
1722 int fio_handle_client(struct fio_client *client)
1723 {
1724         struct client_ops *ops = client->ops;
1725         struct fio_net_cmd *cmd;
1726         int size;
1727
1728         dprint(FD_NET, "client: handle %s\n", client->hostname);
1729
1730         cmd = fio_net_recv_cmd(client->fd, true);
1731         if (!cmd)
1732                 return 0;
1733
1734         dprint(FD_NET, "client: got cmd op %s from %s (pdu=%u)\n",
1735                 fio_server_op(cmd->opcode), client->hostname, cmd->pdu_len);
1736
1737         client->last_cmd = cmd->opcode;
1738
1739         switch (cmd->opcode) {
1740         case FIO_NET_CMD_QUIT:
1741                 if (ops->quit)
1742                         ops->quit(client, cmd);
1743                 remove_client(client);
1744                 break;
1745         case FIO_NET_CMD_TEXT:
1746                 convert_text(cmd);
1747                 ops->text(client, cmd);
1748                 break;
1749         case FIO_NET_CMD_DU: {
1750                 struct cmd_du_pdu *du = (struct cmd_du_pdu *) cmd->payload;
1751
1752                 convert_dus(&du->dus);
1753                 convert_agg(&du->agg);
1754
1755                 ops->disk_util(client, cmd);
1756                 break;
1757                 }
1758         case FIO_NET_CMD_TS: {
1759                 struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload;
1760
1761                 dprint(FD_NET, "client: ts->ss_state = %u\n", (unsigned int) le32_to_cpu(p->ts.ss_state));
1762                 if (le32_to_cpu(p->ts.ss_state) & FIO_SS_DATA) {
1763                         dprint(FD_NET, "client: received steadystate ring buffers\n");
1764
1765                         size = le64_to_cpu(p->ts.ss_dur);
1766                         p->ts.ss_iops_data = (uint64_t *) ((struct cmd_ts_pdu *)cmd->payload + 1);
1767                         p->ts.ss_bw_data = p->ts.ss_iops_data + size;
1768                 }
1769
1770                 convert_ts(&p->ts, &p->ts);
1771                 convert_gs(&p->rs, &p->rs);
1772
1773                 ops->thread_status(client, cmd);
1774                 break;
1775                 }
1776         case FIO_NET_CMD_GS: {
1777                 struct group_run_stats *gs = (struct group_run_stats *) cmd->payload;
1778
1779                 convert_gs(gs, gs);
1780
1781                 ops->group_stats(client, cmd);
1782                 break;
1783                 }
1784         case FIO_NET_CMD_ETA: {
1785                 struct jobs_eta *je = (struct jobs_eta *) cmd->payload;
1786
1787                 if (!remove_reply_cmd(client, cmd))
1788                         break;
1789                 convert_jobs_eta(je);
1790                 handle_eta(client, cmd);
1791                 break;
1792                 }
1793         case FIO_NET_CMD_PROBE:
1794                 remove_reply_cmd(client, cmd);
1795                 ops->probe(client, cmd);
1796                 break;
1797         case FIO_NET_CMD_SERVER_START:
1798                 client->state = Client_running;
1799                 if (ops->job_start)
1800                         ops->job_start(client, cmd);
1801                 break;
1802         case FIO_NET_CMD_START: {
1803                 struct cmd_start_pdu *pdu = (struct cmd_start_pdu *) cmd->payload;
1804
1805                 pdu->jobs = le32_to_cpu(pdu->jobs);
1806                 ops->start(client, cmd);
1807                 break;
1808                 }
1809         case FIO_NET_CMD_STOP: {
1810                 struct cmd_end_pdu *pdu = (struct cmd_end_pdu *) cmd->payload;
1811
1812                 convert_stop(cmd);
1813                 client->state = Client_stopped;
1814                 client->error = le32_to_cpu(pdu->error);
1815                 client->signal = le32_to_cpu(pdu->signal);
1816                 ops->stop(client);
1817                 break;
1818                 }
1819         case FIO_NET_CMD_ADD_JOB: {
1820                 struct cmd_add_job_pdu *pdu = (struct cmd_add_job_pdu *) cmd->payload;
1821
1822                 client->thread_number = le32_to_cpu(pdu->thread_number);
1823                 client->groupid = le32_to_cpu(pdu->groupid);
1824
1825                 if (ops->add_job)
1826                         ops->add_job(client, cmd);
1827                 break;
1828                 }
1829         case FIO_NET_CMD_IOLOG:
1830                 fio_client_handle_iolog(client, cmd);
1831                 break;
1832         case FIO_NET_CMD_UPDATE_JOB:
1833                 ops->update_job(client, cmd);
1834                 remove_reply_cmd(client, cmd);
1835                 break;
1836         case FIO_NET_CMD_VTRIGGER: {
1837                 struct all_io_list *pdu = (struct all_io_list *) cmd->payload;
1838                 char buf[128];
1839                 int off = 0;
1840
1841                 if (aux_path) {
1842                         strcpy(buf, aux_path);
1843                         off = strlen(buf);
1844                 }
1845
1846                 __verify_save_state(pdu, server_name(client, &buf[off], sizeof(buf) - off));
1847                 exec_trigger(trigger_cmd);
1848                 break;
1849                 }
1850         case FIO_NET_CMD_SENDFILE: {
1851                 struct cmd_sendfile *pdu = (struct cmd_sendfile *) cmd->payload;
1852                 fio_send_file(client, pdu, cmd->tag);
1853                 break;
1854                 }
1855         case FIO_NET_CMD_JOB_OPT: {
1856                 handle_job_opt(client, cmd);
1857                 break;
1858         }
1859         default:
1860                 log_err("fio: unknown client op: %s\n", fio_server_op(cmd->opcode));
1861                 break;
1862         }
1863
1864         free(cmd);
1865         return 1;
1866 }
1867
1868 int fio_clients_send_trigger(const char *cmd)
1869 {
1870         struct flist_head *entry;
1871         struct fio_client *client;
1872         size_t slen;
1873
1874         dprint(FD_NET, "client: send vtrigger: %s\n", cmd);
1875
1876         if (!cmd)
1877                 slen = 0;
1878         else
1879                 slen = strlen(cmd);
1880
1881         flist_for_each(entry, &client_list) {
1882                 struct cmd_vtrigger_pdu *pdu;
1883
1884                 client = flist_entry(entry, struct fio_client, list);
1885
1886                 pdu = malloc(sizeof(*pdu) + slen);
1887                 pdu->len = cpu_to_le16((uint16_t) slen);
1888                 if (slen)
1889                         memcpy(pdu->cmd, cmd, slen);
1890                 fio_net_send_cmd(client->fd, FIO_NET_CMD_VTRIGGER, pdu,
1891                                         sizeof(*pdu) + slen, NULL, NULL);
1892                 free(pdu);
1893         }
1894
1895         return 0;
1896 }
1897
1898 static void request_client_etas(struct client_ops *ops)
1899 {
1900         struct fio_client *client;
1901         struct flist_head *entry;
1902         struct client_eta *eta;
1903         int skipped = 0;
1904
1905         if (eta_print == FIO_ETA_NEVER)
1906                 return;
1907
1908         dprint(FD_NET, "client: request eta (%d)\n", nr_clients);
1909
1910         eta = calloc(1, sizeof(*eta) + __THREAD_RUNSTR_SZ(REAL_MAX_JOBS));
1911         eta->pending = nr_clients;
1912
1913         flist_for_each(entry, &client_list) {
1914                 client = flist_entry(entry, struct fio_client, list);
1915
1916                 if (!flist_empty(&client->eta_list)) {
1917                         skipped++;
1918                         continue;
1919                 }
1920                 if (client->state != Client_running)
1921                         continue;
1922
1923                 assert(!client->eta_in_flight);
1924                 flist_add_tail(&client->eta_list, &eta_list);
1925                 client->eta_in_flight = eta;
1926                 fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_SEND_ETA,
1927                                         (uintptr_t) eta, &client->cmd_list);
1928         }
1929
1930         while (skipped--) {
1931                 if (!fio_client_dec_jobs_eta(eta, ops->eta))
1932                         break;
1933         }
1934
1935         dprint(FD_NET, "client: requested eta tag %p\n", eta);
1936 }
1937
1938 /*
1939  * A single SEND_ETA timeout isn't fatal. Attempt to recover.
1940  */
1941 static int handle_cmd_timeout(struct fio_client *client,
1942                               struct fio_net_cmd_reply *reply)
1943 {
1944         uint16_t reply_opcode = reply->opcode;
1945
1946         flist_del(&reply->list);
1947         free(reply);
1948
1949         if (reply_opcode != FIO_NET_CMD_SEND_ETA)
1950                 return 1;
1951
1952         log_info("client <%s>: timeout on SEND_ETA\n", client->hostname);
1953
1954         flist_del_init(&client->eta_list);
1955         if (client->eta_in_flight) {
1956                 fio_client_dec_jobs_eta(client->eta_in_flight, client->ops->eta);
1957                 client->eta_in_flight = NULL;
1958         }
1959
1960         /*
1961          * If we fail 5 in a row, give up...
1962          */
1963         if (client->eta_timeouts++ > 5)
1964                 return 1;
1965
1966         return 0;
1967 }
1968
1969 static int client_check_cmd_timeout(struct fio_client *client,
1970                                     struct timespec *now)
1971 {
1972         struct fio_net_cmd_reply *reply;
1973         struct flist_head *entry, *tmp;
1974         int ret = 0;
1975
1976         flist_for_each_safe(entry, tmp, &client->cmd_list) {
1977                 unsigned int op;
1978
1979                 reply = flist_entry(entry, struct fio_net_cmd_reply, list);
1980
1981                 if (mtime_since(&reply->ts, now) < FIO_NET_CLIENT_TIMEOUT)
1982                         continue;
1983
1984                 op = reply->opcode;
1985                 if (!handle_cmd_timeout(client, reply))
1986                         continue;
1987
1988                 log_err("fio: client %s, timeout on cmd %s\n", client->hostname,
1989                                                 fio_server_op(op));
1990                 ret = 1;
1991         }
1992
1993         return flist_empty(&client->cmd_list) && ret;
1994 }
1995
1996 static int fio_check_clients_timed_out(void)
1997 {
1998         struct fio_client *client;
1999         struct flist_head *entry, *tmp;
2000         struct timespec ts;
2001         int ret = 0;
2002
2003         fio_gettime(&ts, NULL);
2004
2005         flist_for_each_safe(entry, tmp, &client_list) {
2006                 client = flist_entry(entry, struct fio_client, list);
2007
2008                 if (flist_empty(&client->cmd_list))
2009                         continue;
2010
2011                 if (!client_check_cmd_timeout(client, &ts))
2012                         continue;
2013
2014                 if (client->ops->timed_out)
2015                         client->ops->timed_out(client);
2016                 else
2017                         log_err("fio: client %s timed out\n", client->hostname);
2018
2019                 if (client->last_cmd != FIO_NET_CMD_VTRIGGER)
2020                         client->error = ETIMEDOUT;
2021                 else
2022                         log_info("fio: ignoring timeout due to vtrigger\n");
2023                 remove_client(client);
2024                 ret = 1;
2025         }
2026
2027         return ret;
2028 }
2029
2030 int fio_handle_clients(struct client_ops *ops)
2031 {
2032         struct pollfd *pfds;
2033         int i, ret = 0, retval = 0;
2034
2035         fio_gettime(&eta_ts, NULL);
2036
2037         pfds = malloc(nr_clients * sizeof(struct pollfd));
2038
2039         init_thread_stat(&client_ts);
2040         init_group_run_stat(&client_gs);
2041
2042         while (!exit_backend && nr_clients) {
2043                 struct flist_head *entry, *tmp;
2044                 struct fio_client *client;
2045
2046                 i = 0;
2047                 flist_for_each_safe(entry, tmp, &client_list) {
2048                         client = flist_entry(entry, struct fio_client, list);
2049
2050                         if (!client->sent_job && !client->ops->stay_connected &&
2051                             flist_empty(&client->cmd_list)) {
2052                                 remove_client(client);
2053                                 continue;
2054                         }
2055
2056                         pfds[i].fd = client->fd;
2057                         pfds[i].events = POLLIN;
2058                         i++;
2059                 }
2060
2061                 if (!nr_clients)
2062                         break;
2063
2064                 assert(i == nr_clients);
2065
2066                 do {
2067                         struct timespec ts;
2068                         int timeout;
2069
2070                         fio_gettime(&ts, NULL);
2071                         if (eta_time_within_slack(mtime_since(&eta_ts, &ts))) {
2072                                 request_client_etas(ops);
2073                                 memcpy(&eta_ts, &ts, sizeof(ts));
2074
2075                                 if (fio_check_clients_timed_out())
2076                                         break;
2077                         }
2078
2079                         check_trigger_file();
2080
2081                         timeout = min(100u, ops->eta_msec);
2082
2083                         ret = poll(pfds, nr_clients, timeout);
2084                         if (ret < 0) {
2085                                 if (errno == EINTR)
2086                                         continue;
2087                                 log_err("fio: poll clients: %s\n", strerror(errno));
2088                                 break;
2089                         } else if (!ret)
2090                                 continue;
2091                 } while (ret <= 0);
2092
2093                 for (i = 0; i < nr_clients; i++) {
2094                         if (!(pfds[i].revents & POLLIN))
2095                                 continue;
2096
2097                         client = find_client_by_fd(pfds[i].fd);
2098                         if (!client) {
2099                                 log_err("fio: unknown client fd %ld\n", (long) pfds[i].fd);
2100                                 continue;
2101                         }
2102                         if (!fio_handle_client(client)) {
2103                                 log_info("client: host=%s disconnected\n",
2104                                                 client->hostname);
2105                                 remove_client(client);
2106                                 retval = 1;
2107                         } else if (client->error)
2108                                 retval = 1;
2109                         fio_put_client(client);
2110                 }
2111         }
2112
2113         fio_client_json_fini();
2114
2115         free(pfds);
2116         return retval || error_clients;
2117 }
2118
2119 static void client_display_thread_status(struct jobs_eta *je)
2120 {
2121         if (!(output_format & FIO_OUTPUT_JSON))
2122                 display_thread_status(je);
2123 }