client: don't leak file descriptor on exit
[fio.git] / client.c
1 #include <stdio.h>
2 #include <stdlib.h>
3 #include <unistd.h>
4 #include <limits.h>
5 #include <errno.h>
6 #include <fcntl.h>
7 #include <sys/poll.h>
8 #include <sys/types.h>
9 #include <sys/stat.h>
10 #include <sys/wait.h>
11 #include <sys/socket.h>
12 #include <sys/un.h>
13 #include <netinet/in.h>
14 #include <arpa/inet.h>
15 #include <netdb.h>
16 #include <signal.h>
17 #include <zlib.h>
18
19 #include "fio.h"
20 #include "client.h"
21 #include "server.h"
22 #include "flist.h"
23 #include "hash.h"
24
25 static void handle_du(struct fio_client *client, struct fio_net_cmd *cmd);
26 static void handle_ts(struct fio_client *client, struct fio_net_cmd *cmd);
27 static void handle_gs(struct fio_client *client, struct fio_net_cmd *cmd);
28 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd);
29 static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd);
30 static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd);
31 static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd);
32
33 struct client_ops fio_client_ops = {
34         .text           = handle_text,
35         .disk_util      = handle_du,
36         .thread_status  = handle_ts,
37         .group_stats    = handle_gs,
38         .stop           = handle_stop,
39         .start          = handle_start,
40         .eta            = display_thread_status,
41         .probe          = handle_probe,
42         .eta_msec       = FIO_CLIENT_DEF_ETA_MSEC,
43         .client_type    = FIO_CLIENT_TYPE_CLI,
44 };
45
46 static struct timeval eta_tv;
47
48 static FLIST_HEAD(client_list);
49 static FLIST_HEAD(eta_list);
50
51 static FLIST_HEAD(arg_list);
52
53 struct thread_stat client_ts;
54 struct group_run_stats client_gs;
55 int sum_stat_clients;
56
57 static int sum_stat_nr;
58
59 #define FIO_CLIENT_HASH_BITS    7
60 #define FIO_CLIENT_HASH_SZ      (1 << FIO_CLIENT_HASH_BITS)
61 #define FIO_CLIENT_HASH_MASK    (FIO_CLIENT_HASH_SZ - 1)
62 static struct flist_head client_hash[FIO_CLIENT_HASH_SZ];
63
64 static void fio_client_add_hash(struct fio_client *client)
65 {
66         int bucket = hash_long(client->fd, FIO_CLIENT_HASH_BITS);
67
68         bucket &= FIO_CLIENT_HASH_MASK;
69         flist_add(&client->hash_list, &client_hash[bucket]);
70 }
71
72 static void fio_client_remove_hash(struct fio_client *client)
73 {
74         if (!flist_empty(&client->hash_list))
75                 flist_del_init(&client->hash_list);
76 }
77
78 static void fio_init fio_client_hash_init(void)
79 {
80         int i;
81
82         for (i = 0; i < FIO_CLIENT_HASH_SZ; i++)
83                 INIT_FLIST_HEAD(&client_hash[i]);
84 }
85
86 static struct fio_client *find_client_by_fd(int fd)
87 {
88         int bucket = hash_long(fd, FIO_CLIENT_HASH_BITS) & FIO_CLIENT_HASH_MASK;
89         struct fio_client *client;
90         struct flist_head *entry;
91
92         flist_for_each(entry, &client_hash[bucket]) {
93                 client = flist_entry(entry, struct fio_client, hash_list);
94
95                 if (client->fd == fd) {
96                         client->refs++;
97                         return client;
98                 }
99         }
100
101         return NULL;
102 }
103
104 void fio_put_client(struct fio_client *client)
105 {
106         if (--client->refs)
107                 return;
108
109         free(client->hostname);
110         if (client->argv)
111                 free(client->argv);
112         if (client->name)
113                 free(client->name);
114
115         free(client);
116 }
117
118 static void remove_client(struct fio_client *client)
119 {
120         assert(client->refs);
121
122         dprint(FD_NET, "client: removed <%s>\n", client->hostname);
123
124         if (!flist_empty(&client->list))
125                 flist_del_init(&client->list);
126
127         fio_client_remove_hash(client);
128
129         if (!flist_empty(&client->eta_list)) {
130                 flist_del_init(&client->eta_list);
131                 fio_client_dec_jobs_eta(client->eta_in_flight, client->ops->eta);
132         }
133
134         close(client->fd);
135         client->fd = -1;
136
137         nr_clients--;
138         sum_stat_clients--;
139
140         fio_put_client(client);
141 }
142
143 struct fio_client *fio_get_client(struct fio_client *client)
144 {
145         client->refs++;
146         return client;
147 }
148
149 static void __fio_client_add_cmd_option(struct fio_client *client,
150                                         const char *opt)
151 {
152         int index;
153
154         index = client->argc++;
155         client->argv = realloc(client->argv, sizeof(char *) * client->argc);
156         client->argv[index] = strdup(opt);
157         dprint(FD_NET, "client: add cmd %d: %s\n", index, opt);
158 }
159
160 void fio_client_add_cmd_option(void *cookie, const char *opt)
161 {
162         struct fio_client *client = cookie;
163         struct flist_head *entry;
164
165         if (!client || !opt)
166                 return;
167
168         __fio_client_add_cmd_option(client, opt);
169
170         /*
171          * Duplicate arguments to shared client group
172          */
173         flist_for_each(entry, &arg_list) {
174                 client = flist_entry(entry, struct fio_client, arg_list);
175
176                 __fio_client_add_cmd_option(client, opt);
177         }
178 }
179
180 struct fio_client *fio_client_add_explicit(struct client_ops *ops,
181                                            const char *hostname, int type,
182                                            int port)
183 {
184         struct fio_client *client;
185
186         client = malloc(sizeof(*client));
187         memset(client, 0, sizeof(*client));
188
189         INIT_FLIST_HEAD(&client->list);
190         INIT_FLIST_HEAD(&client->hash_list);
191         INIT_FLIST_HEAD(&client->arg_list);
192         INIT_FLIST_HEAD(&client->eta_list);
193         INIT_FLIST_HEAD(&client->cmd_list);
194
195         client->hostname = strdup(hostname);
196
197         if (type == Fio_client_socket)
198                 client->is_sock = 1;
199         else {
200                 int ipv6;
201
202                 ipv6 = type == Fio_client_ipv6;
203                 if (fio_server_parse_host(hostname, &ipv6,
204                                                 &client->addr.sin_addr,
205                                                 &client->addr6.sin6_addr))
206                         goto err;
207
208                 client->port = port;
209         }
210
211         client->fd = -1;
212         client->ops = ops;
213         client->refs = 1;
214         client->type = ops->client_type;
215
216         __fio_client_add_cmd_option(client, "fio");
217
218         flist_add(&client->list, &client_list);
219         nr_clients++;
220         dprint(FD_NET, "client: added <%s>\n", client->hostname);
221         return client;
222 err:
223         free(client);
224         return NULL;
225 }
226
227 int fio_client_add(struct client_ops *ops, const char *hostname, void **cookie)
228 {
229         struct fio_client *existing = *cookie;
230         struct fio_client *client;
231
232         if (existing) {
233                 /*
234                  * We always add our "exec" name as the option, hence 1
235                  * means empty.
236                  */
237                 if (existing->argc == 1)
238                         flist_add_tail(&existing->arg_list, &arg_list);
239                 else {
240                         while (!flist_empty(&arg_list))
241                                 flist_del_init(arg_list.next);
242                 }
243         }
244
245         client = malloc(sizeof(*client));
246         memset(client, 0, sizeof(*client));
247
248         INIT_FLIST_HEAD(&client->list);
249         INIT_FLIST_HEAD(&client->hash_list);
250         INIT_FLIST_HEAD(&client->arg_list);
251         INIT_FLIST_HEAD(&client->eta_list);
252         INIT_FLIST_HEAD(&client->cmd_list);
253
254         if (fio_server_parse_string(hostname, &client->hostname,
255                                         &client->is_sock, &client->port,
256                                         &client->addr.sin_addr,
257                                         &client->addr6.sin6_addr,
258                                         &client->ipv6))
259                 return -1;
260
261         client->fd = -1;
262         client->ops = ops;
263         client->refs = 1;
264         client->type = ops->client_type;
265
266         __fio_client_add_cmd_option(client, "fio");
267
268         flist_add(&client->list, &client_list);
269         nr_clients++;
270         dprint(FD_NET, "client: added <%s>\n", client->hostname);
271         *cookie = client;
272         return 0;
273 }
274
275 static void probe_client(struct fio_client *client)
276 {
277         dprint(FD_NET, "client: send probe\n");
278
279         fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_PROBE, 0, &client->cmd_list);
280 }
281
282 static int fio_client_connect_ip(struct fio_client *client)
283 {
284         struct sockaddr *addr;
285         fio_socklen_t socklen;
286         int fd, domain;
287
288         if (client->ipv6) {
289                 client->addr6.sin6_family = AF_INET6;
290                 client->addr6.sin6_port = htons(client->port);
291                 domain = AF_INET6;
292                 addr = (struct sockaddr *) &client->addr6;
293                 socklen = sizeof(client->addr6);
294         } else {
295                 client->addr.sin_family = AF_INET;
296                 client->addr.sin_port = htons(client->port);
297                 domain = AF_INET;
298                 addr = (struct sockaddr *) &client->addr;
299                 socklen = sizeof(client->addr);
300         }
301
302         fd = socket(domain, SOCK_STREAM, 0);
303         if (fd < 0) {
304                 int ret = -errno;
305
306                 log_err("fio: socket: %s\n", strerror(errno));
307                 return ret;
308         }
309
310         if (connect(fd, addr, socklen) < 0) {
311                 int ret = -errno;
312
313                 log_err("fio: connect: %s\n", strerror(errno));
314                 log_err("fio: failed to connect to %s:%u\n", client->hostname,
315                                                                 client->port);
316                 close(fd);
317                 return ret;
318         }
319
320         return fd;
321 }
322
323 static int fio_client_connect_sock(struct fio_client *client)
324 {
325         struct sockaddr_un *addr = &client->addr_un;
326         fio_socklen_t len;
327         int fd;
328
329         memset(addr, 0, sizeof(*addr));
330         addr->sun_family = AF_UNIX;
331         strcpy(addr->sun_path, client->hostname);
332
333         fd = socket(AF_UNIX, SOCK_STREAM, 0);
334         if (fd < 0) {
335                 int ret = -errno;
336
337                 log_err("fio: socket: %s\n", strerror(errno));
338                 return ret;
339         }
340
341         len = sizeof(addr->sun_family) + strlen(addr->sun_path) + 1;
342         if (connect(fd, (struct sockaddr *) addr, len) < 0) {
343                 int ret = -errno;
344
345                 log_err("fio: connect; %s\n", strerror(errno));
346                 close(fd);
347                 return ret;
348         }
349
350         return fd;
351 }
352
353 int fio_client_connect(struct fio_client *client)
354 {
355         int fd;
356
357         dprint(FD_NET, "client: connect to host %s\n", client->hostname);
358
359         if (client->is_sock)
360                 fd = fio_client_connect_sock(client);
361         else
362                 fd = fio_client_connect_ip(client);
363
364         dprint(FD_NET, "client: %s connected %d\n", client->hostname, fd);
365
366         if (fd < 0)
367                 return fd;
368
369         client->fd = fd;
370         fio_client_add_hash(client);
371         client->state = Client_connected;
372
373         probe_client(client);
374         return 0;
375 }
376
377 void fio_client_terminate(struct fio_client *client)
378 {
379         fio_net_send_quit(client->fd);
380 }
381
382 void fio_clients_terminate(void)
383 {
384         struct flist_head *entry;
385         struct fio_client *client;
386
387         dprint(FD_NET, "client: terminate clients\n");
388
389         flist_for_each(entry, &client_list) {
390                 client = flist_entry(entry, struct fio_client, list);
391                 fio_client_terminate(client);
392         }
393 }
394
395 static void sig_int(int sig)
396 {
397         dprint(FD_NET, "client: got signal %d\n", sig);
398         fio_clients_terminate();
399 }
400
401 static void client_signal_handler(void)
402 {
403         struct sigaction act;
404
405         memset(&act, 0, sizeof(act));
406         act.sa_handler = sig_int;
407         act.sa_flags = SA_RESTART;
408         sigaction(SIGINT, &act, NULL);
409
410         memset(&act, 0, sizeof(act));
411         act.sa_handler = sig_int;
412         act.sa_flags = SA_RESTART;
413         sigaction(SIGTERM, &act, NULL);
414 }
415
416 static int send_client_cmd_line(struct fio_client *client)
417 {
418         struct cmd_single_line_pdu *cslp;
419         struct cmd_line_pdu *clp;
420         unsigned long offset;
421         unsigned int *lens;
422         void *pdu;
423         size_t mem;
424         int i, ret;
425
426         dprint(FD_NET, "client: send cmdline %d\n", client->argc);
427
428         lens = malloc(client->argc * sizeof(unsigned int));
429
430         /*
431          * Find out how much mem we need
432          */
433         for (i = 0, mem = 0; i < client->argc; i++) {
434                 lens[i] = strlen(client->argv[i]) + 1;
435                 mem += lens[i];
436         }
437
438         /*
439          * We need one cmd_line_pdu, and argc number of cmd_single_line_pdu
440          */
441         mem += sizeof(*clp) + (client->argc * sizeof(*cslp));
442
443         pdu = malloc(mem);
444         clp = pdu;
445         offset = sizeof(*clp);
446
447         for (i = 0; i < client->argc; i++) {
448                 uint16_t arg_len = lens[i];
449
450                 cslp = pdu + offset;
451                 strcpy((char *) cslp->text, client->argv[i]);
452                 cslp->len = cpu_to_le16(arg_len);
453                 offset += sizeof(*cslp) + arg_len;
454         }
455
456         free(lens);
457         clp->lines = cpu_to_le16(client->argc);
458         clp->client_type = __cpu_to_le16(client->type);
459         ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOBLINE, pdu, mem, 0);
460         free(pdu);
461         return ret;
462 }
463
464 int fio_clients_connect(void)
465 {
466         struct fio_client *client;
467         struct flist_head *entry, *tmp;
468         int ret;
469
470 #ifdef WIN32
471         WSADATA wsd;
472         WSAStartup(MAKEWORD(2,2), &wsd);
473 #endif
474
475         dprint(FD_NET, "client: connect all\n");
476
477         client_signal_handler();
478
479         flist_for_each_safe(entry, tmp, &client_list) {
480                 client = flist_entry(entry, struct fio_client, list);
481
482                 ret = fio_client_connect(client);
483                 if (ret) {
484                         remove_client(client);
485                         continue;
486                 }
487
488                 if (client->argc > 1)
489                         send_client_cmd_line(client);
490         }
491
492         return !nr_clients;
493 }
494
495 int fio_start_client(struct fio_client *client)
496 {
497         dprint(FD_NET, "client: start %s\n", client->hostname);
498         return fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_RUN, 0, NULL);
499 }
500
501 int fio_start_all_clients(void)
502 {
503         struct fio_client *client;
504         struct flist_head *entry, *tmp;
505         int ret;
506
507         dprint(FD_NET, "client: start all\n");
508
509         flist_for_each_safe(entry, tmp, &client_list) {
510                 client = flist_entry(entry, struct fio_client, list);
511
512                 ret = fio_start_client(client);
513                 if (ret) {
514                         remove_client(client);
515                         continue;
516                 }
517         }
518
519         return flist_empty(&client_list);
520 }
521
522 /*
523  * Send file contents to server backend. We could use sendfile(), but to remain
524  * more portable lets just read/write the darn thing.
525  */
526 static int __fio_client_send_ini(struct fio_client *client, const char *filename)
527 {
528         struct cmd_job_pdu *pdu;
529         size_t p_size;
530         struct stat sb;
531         char *p;
532         void *buf;
533         off_t len;
534         int fd, ret;
535
536         dprint(FD_NET, "send ini %s to %s\n", filename, client->hostname);
537
538         fd = open(filename, O_RDONLY);
539         if (fd < 0) {
540                 int ret = -errno;
541
542                 log_err("fio: job file <%s> open: %s\n", filename, strerror(errno));
543                 return ret;
544         }
545
546         if (fstat(fd, &sb) < 0) {
547                 int ret = -errno;
548
549                 log_err("fio: job file stat: %s\n", strerror(errno));
550                 close(fd);
551                 return ret;
552         }
553
554         p_size = sb.st_size + sizeof(*pdu);
555         pdu = malloc(p_size);
556         buf = pdu->buf;
557
558         len = sb.st_size;
559         p = buf;
560         do {
561                 ret = read(fd, p, len);
562                 if (ret > 0) {
563                         len -= ret;
564                         if (!len)
565                                 break;
566                         p += ret;
567                         continue;
568                 } else if (!ret)
569                         break;
570                 else if (errno == EAGAIN || errno == EINTR)
571                         continue;
572         } while (1);
573
574         if (len) {
575                 log_err("fio: failed reading job file %s\n", filename);
576                 close(fd);
577                 free(buf);
578                 return 1;
579         }
580
581         pdu->buf_len = __cpu_to_le32(sb.st_size);
582         pdu->client_type = cpu_to_le32(client->type);
583
584         client->sent_job = 1;
585         ret = fio_net_send_cmd(client->fd, FIO_NET_CMD_JOB, pdu, p_size, 0);
586         free(pdu);
587         close(fd);
588         return ret;
589 }
590
591 int fio_client_send_ini(struct fio_client *client, const char *filename)
592 {
593         int ret;
594
595         ret = __fio_client_send_ini(client, filename);
596         if (!ret)
597                 client->sent_job = 1;
598
599         return ret;
600 }
601
602 int fio_clients_send_ini(const char *filename)
603 {
604         struct fio_client *client;
605         struct flist_head *entry, *tmp;
606
607         flist_for_each_safe(entry, tmp, &client_list) {
608                 client = flist_entry(entry, struct fio_client, list);
609
610                 if (fio_client_send_ini(client, filename))
611                         remove_client(client);
612         }
613
614         return !nr_clients;
615 }
616
617 static void convert_io_stat(struct io_stat *dst, struct io_stat *src)
618 {
619         dst->max_val    = le64_to_cpu(src->max_val);
620         dst->min_val    = le64_to_cpu(src->min_val);
621         dst->samples    = le64_to_cpu(src->samples);
622
623         /*
624          * Floats arrive as IEEE 754 encoded uint64_t, convert back to double
625          */
626         dst->mean.u.f   = fio_uint64_to_double(le64_to_cpu(dst->mean.u.i));
627         dst->S.u.f      = fio_uint64_to_double(le64_to_cpu(dst->S.u.i));
628 }
629
630 static void convert_ts(struct thread_stat *dst, struct thread_stat *src)
631 {
632         int i, j;
633
634         dst->error              = le32_to_cpu(src->error);
635         dst->thread_number      = le32_to_cpu(src->thread_number);
636         dst->groupid            = le32_to_cpu(src->groupid);
637         dst->pid                = le32_to_cpu(src->pid);
638         dst->members            = le32_to_cpu(src->members);
639
640         for (i = 0; i < 2; i++) {
641                 convert_io_stat(&dst->clat_stat[i], &src->clat_stat[i]);
642                 convert_io_stat(&dst->slat_stat[i], &src->slat_stat[i]);
643                 convert_io_stat(&dst->lat_stat[i], &src->lat_stat[i]);
644                 convert_io_stat(&dst->bw_stat[i], &src->bw_stat[i]);
645         }
646
647         dst->usr_time           = le64_to_cpu(src->usr_time);
648         dst->sys_time           = le64_to_cpu(src->sys_time);
649         dst->ctx                = le64_to_cpu(src->ctx);
650         dst->minf               = le64_to_cpu(src->minf);
651         dst->majf               = le64_to_cpu(src->majf);
652         dst->clat_percentiles   = le64_to_cpu(src->clat_percentiles);
653
654         for (i = 0; i < FIO_IO_U_LIST_MAX_LEN; i++) {
655                 fio_fp64_t *fps = &src->percentile_list[i];
656                 fio_fp64_t *fpd = &dst->percentile_list[i];
657
658                 fpd->u.f = fio_uint64_to_double(le64_to_cpu(fps->u.i));
659         }
660
661         for (i = 0; i < FIO_IO_U_MAP_NR; i++) {
662                 dst->io_u_map[i]        = le32_to_cpu(src->io_u_map[i]);
663                 dst->io_u_submit[i]     = le32_to_cpu(src->io_u_submit[i]);
664                 dst->io_u_complete[i]   = le32_to_cpu(src->io_u_complete[i]);
665         }
666
667         for (i = 0; i < FIO_IO_U_LAT_U_NR; i++) {
668                 dst->io_u_lat_u[i]      = le32_to_cpu(src->io_u_lat_u[i]);
669                 dst->io_u_lat_m[i]      = le32_to_cpu(src->io_u_lat_m[i]);
670         }
671
672         for (i = 0; i < 2; i++)
673                 for (j = 0; j < FIO_IO_U_PLAT_NR; j++)
674                         dst->io_u_plat[i][j] = le32_to_cpu(src->io_u_plat[i][j]);
675
676         for (i = 0; i < 3; i++) {
677                 dst->total_io_u[i]      = le64_to_cpu(src->total_io_u[i]);
678                 dst->short_io_u[i]      = le64_to_cpu(src->short_io_u[i]);
679         }
680
681         dst->total_submit       = le64_to_cpu(src->total_submit);
682         dst->total_complete     = le64_to_cpu(src->total_complete);
683
684         for (i = 0; i < 2; i++) {
685                 dst->io_bytes[i]        = le64_to_cpu(src->io_bytes[i]);
686                 dst->runtime[i]         = le64_to_cpu(src->runtime[i]);
687         }
688
689         dst->total_run_time     = le64_to_cpu(src->total_run_time);
690         dst->continue_on_error  = le16_to_cpu(src->continue_on_error);
691         dst->total_err_count    = le64_to_cpu(src->total_err_count);
692         dst->first_error        = le32_to_cpu(src->first_error);
693         dst->kb_base            = le32_to_cpu(src->kb_base);
694 }
695
696 static void convert_gs(struct group_run_stats *dst, struct group_run_stats *src)
697 {
698         int i;
699
700         for (i = 0; i < 2; i++) {
701                 dst->max_run[i]         = le64_to_cpu(src->max_run[i]);
702                 dst->min_run[i]         = le64_to_cpu(src->min_run[i]);
703                 dst->max_bw[i]          = le64_to_cpu(src->max_bw[i]);
704                 dst->min_bw[i]          = le64_to_cpu(src->min_bw[i]);
705                 dst->io_kb[i]           = le64_to_cpu(src->io_kb[i]);
706                 dst->agg[i]             = le64_to_cpu(src->agg[i]);
707         }
708
709         dst->kb_base    = le32_to_cpu(src->kb_base);
710         dst->groupid    = le32_to_cpu(src->groupid);
711 }
712
713 static void handle_ts(struct fio_client *client, struct fio_net_cmd *cmd)
714 {
715         struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload;
716
717         show_thread_status(&p->ts, &p->rs);
718
719         if (sum_stat_clients == 1)
720                 return;
721
722         sum_thread_stats(&client_ts, &p->ts, sum_stat_nr);
723         sum_group_stats(&client_gs, &p->rs);
724
725         client_ts.members++;
726         client_ts.thread_number = p->ts.thread_number;
727         client_ts.groupid = p->ts.groupid;
728
729         if (++sum_stat_nr == sum_stat_clients) {
730                 strcpy(client_ts.name, "All clients");
731                 show_thread_status(&client_ts, &client_gs);
732         }
733 }
734
735 static void handle_gs(struct fio_client *client, struct fio_net_cmd *cmd)
736 {
737         struct group_run_stats *gs = (struct group_run_stats *) cmd->payload;
738
739         show_group_stats(gs);
740 }
741
742 static void handle_text(struct fio_client *client, struct fio_net_cmd *cmd)
743 {
744         struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmd->payload;
745         const char *buf = (const char *) pdu->buf;
746         const char *name;
747         int fio_unused ret;
748
749         name = client->name ? client->name : client->hostname;
750
751         if (!client->skip_newline)
752                 fprintf(f_out, "<%s> ", name);
753         ret = fwrite(buf, pdu->buf_len, 1, f_out);
754         fflush(f_out);
755         client->skip_newline = strchr(buf, '\n') == NULL;
756 }
757
758 static void convert_agg(struct disk_util_agg *agg)
759 {
760         int i;
761
762         for (i = 0; i < 2; i++) {
763                 agg->ios[i]     = le32_to_cpu(agg->ios[i]);
764                 agg->merges[i]  = le32_to_cpu(agg->merges[i]);
765                 agg->sectors[i] = le64_to_cpu(agg->sectors[i]);
766                 agg->ticks[i]   = le32_to_cpu(agg->ticks[i]);
767         }
768
769         agg->io_ticks           = le32_to_cpu(agg->io_ticks);
770         agg->time_in_queue      = le32_to_cpu(agg->time_in_queue);
771         agg->slavecount         = le32_to_cpu(agg->slavecount);
772         agg->max_util.u.f       = fio_uint64_to_double(__le64_to_cpu(agg->max_util.u.i));
773 }
774
775 static void convert_dus(struct disk_util_stat *dus)
776 {
777         int i;
778
779         for (i = 0; i < 2; i++) {
780                 dus->ios[i]     = le32_to_cpu(dus->ios[i]);
781                 dus->merges[i]  = le32_to_cpu(dus->merges[i]);
782                 dus->sectors[i] = le64_to_cpu(dus->sectors[i]);
783                 dus->ticks[i]   = le32_to_cpu(dus->ticks[i]);
784         }
785
786         dus->io_ticks           = le32_to_cpu(dus->io_ticks);
787         dus->time_in_queue      = le32_to_cpu(dus->time_in_queue);
788         dus->msec               = le64_to_cpu(dus->msec);
789 }
790
791 static void handle_du(struct fio_client *client, struct fio_net_cmd *cmd)
792 {
793         struct cmd_du_pdu *du = (struct cmd_du_pdu *) cmd->payload;
794
795         if (!client->disk_stats_shown) {
796                 client->disk_stats_shown = 1;
797                 log_info("\nDisk stats (read/write):\n");
798         }
799
800         print_disk_util(&du->dus, &du->agg, terse_output);
801 }
802
803 static void convert_jobs_eta(struct jobs_eta *je)
804 {
805         int i;
806
807         je->nr_running          = le32_to_cpu(je->nr_running);
808         je->nr_ramp             = le32_to_cpu(je->nr_ramp);
809         je->nr_pending          = le32_to_cpu(je->nr_pending);
810         je->files_open          = le32_to_cpu(je->files_open);
811
812         for (i = 0; i < 2; i++) {
813                 je->m_rate[i]           = le32_to_cpu(je->m_rate[i]);
814                 je->t_rate[i]           = le32_to_cpu(je->t_rate[i]);
815                 je->m_iops[i]           = le32_to_cpu(je->m_iops[i]);
816                 je->t_iops[i]           = le32_to_cpu(je->t_iops[i]);
817                 je->rate[i]     = le32_to_cpu(je->rate[i]);
818                 je->iops[i]     = le32_to_cpu(je->iops[i]);
819         }
820
821         je->elapsed_sec         = le64_to_cpu(je->elapsed_sec);
822         je->eta_sec             = le64_to_cpu(je->eta_sec);
823         je->nr_threads          = le32_to_cpu(je->nr_threads);
824 }
825
826 void fio_client_sum_jobs_eta(struct jobs_eta *dst, struct jobs_eta *je)
827 {
828         int i;
829
830         dst->nr_running         += je->nr_running;
831         dst->nr_ramp            += je->nr_ramp;
832         dst->nr_pending         += je->nr_pending;
833         dst->files_open         += je->files_open;
834
835         for (i = 0; i < 2; i++) {
836                 dst->m_rate[i]  += je->m_rate[i];
837                 dst->t_rate[i]  += je->t_rate[i];
838                 dst->m_iops[i]  += je->m_iops[i];
839                 dst->t_iops[i]  += je->t_iops[i];
840                 dst->rate[i]    += je->rate[i];
841                 dst->iops[i]    += je->iops[i];
842         }
843
844         dst->elapsed_sec        += je->elapsed_sec;
845
846         if (je->eta_sec > dst->eta_sec)
847                 dst->eta_sec = je->eta_sec;
848
849         dst->nr_threads         += je->nr_threads;
850         /* we need to handle je->run_str too ... */
851 }
852
853 void fio_client_dec_jobs_eta(struct client_eta *eta, client_eta_op eta_fn)
854 {
855         if (!--eta->pending) {
856                 eta_fn(&eta->eta);
857                 free(eta);
858         }
859 }
860
861 static void remove_reply_cmd(struct fio_client *client, struct fio_net_cmd *cmd)
862 {
863         struct fio_net_int_cmd *icmd = NULL;
864         struct flist_head *entry;
865
866         flist_for_each(entry, &client->cmd_list) {
867                 icmd = flist_entry(entry, struct fio_net_int_cmd, list);
868
869                 if (cmd->tag == (uintptr_t) icmd)
870                         break;
871
872                 icmd = NULL;
873         }
874
875         if (!icmd) {
876                 log_err("fio: client: unable to find matching tag\n");
877                 return;
878         }
879
880         flist_del(&icmd->list);
881         cmd->tag = icmd->saved_tag;
882         free(icmd);
883 }
884
885 static void handle_eta(struct fio_client *client, struct fio_net_cmd *cmd)
886 {
887         struct jobs_eta *je = (struct jobs_eta *) cmd->payload;
888         struct client_eta *eta = (struct client_eta *) (uintptr_t) cmd->tag;
889
890         dprint(FD_NET, "client: got eta tag %p, %d\n", eta, eta->pending);
891
892         assert(client->eta_in_flight == eta);
893
894         client->eta_in_flight = NULL;
895         flist_del_init(&client->eta_list);
896
897         if (client->ops->jobs_eta)
898                 client->ops->jobs_eta(client, je);
899
900         fio_client_sum_jobs_eta(&eta->eta, je);
901         fio_client_dec_jobs_eta(eta, client->ops->eta);
902 }
903
904 static void handle_probe(struct fio_client *client, struct fio_net_cmd *cmd)
905 {
906         struct cmd_probe_pdu *probe = (struct cmd_probe_pdu *) cmd->payload;
907         const char *os, *arch;
908         char bit[16];
909
910         os = fio_get_os_string(probe->os);
911         if (!os)
912                 os = "unknown";
913
914         arch = fio_get_arch_string(probe->arch);
915         if (!arch)
916                 os = "unknown";
917
918         sprintf(bit, "%d-bit", probe->bpp * 8);
919
920         log_info("hostname=%s, be=%u, %s, os=%s, arch=%s, fio=%u.%u.%u\n",
921                 probe->hostname, probe->bigendian, bit, os, arch,
922                 probe->fio_major, probe->fio_minor, probe->fio_patch);
923
924         if (!client->name)
925                 client->name = strdup((char *) probe->hostname);
926 }
927
928 static void handle_start(struct fio_client *client, struct fio_net_cmd *cmd)
929 {
930         struct cmd_start_pdu *pdu = (struct cmd_start_pdu *) cmd->payload;
931
932         client->state = Client_started;
933         client->jobs = pdu->jobs;
934 }
935
936 static void handle_stop(struct fio_client *client, struct fio_net_cmd *cmd)
937 {
938         if (client->error)
939                 log_info("client <%s>: exited with error %d\n", client->hostname, client->error);
940 }
941
942 static void convert_stop(struct fio_net_cmd *cmd)
943 {
944         struct cmd_end_pdu *pdu = (struct cmd_end_pdu *) cmd->payload;
945
946         pdu->error = le32_to_cpu(pdu->error);
947 }
948
949 static void convert_text(struct fio_net_cmd *cmd)
950 {
951         struct cmd_text_pdu *pdu = (struct cmd_text_pdu *) cmd->payload;
952
953         pdu->level      = le32_to_cpu(pdu->level);
954         pdu->buf_len    = le32_to_cpu(pdu->buf_len);
955         pdu->log_sec    = le64_to_cpu(pdu->log_sec);
956         pdu->log_usec   = le64_to_cpu(pdu->log_usec);
957 }
958
959 /*
960  * This has been compressed on the server side, since it can be big.
961  * Uncompress here.
962  */
963 static struct cmd_iolog_pdu *convert_iolog(struct fio_net_cmd *cmd)
964 {
965         struct cmd_iolog_pdu *pdu = (struct cmd_iolog_pdu *) cmd->payload;
966         struct cmd_iolog_pdu *ret;
967         uint32_t nr_samples;
968         unsigned long total;
969         z_stream stream;
970         void *p;
971         int i;
972
973         stream.zalloc = Z_NULL;
974         stream.zfree = Z_NULL;
975         stream.opaque = Z_NULL;
976         stream.avail_in = 0;
977         stream.next_in = Z_NULL;
978
979         if (inflateInit(&stream) != Z_OK)
980                 return NULL;
981
982         /*
983          * Get header first, it's not compressed
984          */
985         nr_samples = le32_to_cpu(pdu->nr_samples);
986
987         total = nr_samples * sizeof(struct io_sample);
988         ret = malloc(total + sizeof(*pdu));
989         ret->thread_number = le32_to_cpu(pdu->thread_number);
990         ret->nr_samples = nr_samples;
991         ret->log_type = le32_to_cpu(pdu->log_type);
992         strcpy((char *) ret->name, (char *) pdu->name);
993
994         p = (void *) ret + sizeof(*pdu);
995
996         stream.avail_in = cmd->pdu_len - sizeof(*pdu);
997         stream.next_in = (void *) pdu + sizeof(*pdu);
998         while (stream.avail_in) {
999                 unsigned int this_chunk = 65536;
1000                 unsigned int this_len;
1001                 int err;
1002
1003                 if (this_chunk > total)
1004                         this_chunk = total;
1005
1006                 stream.avail_out = this_chunk;
1007                 stream.next_out = p;
1008                 err = inflate(&stream, Z_NO_FLUSH);
1009                 /* may be Z_OK, or Z_STREAM_END */
1010                 if (err < 0) {
1011                         log_err("fio: inflate error %d\n", err);
1012                         free(ret);
1013                         ret = NULL;
1014                         goto out;
1015                 }
1016
1017                 this_len = this_chunk - stream.avail_out;
1018                 p += this_len;
1019                 total -= this_len;
1020         }
1021
1022         for (i = 0; i < ret->nr_samples; i++) {
1023                 struct io_sample *s = &ret->samples[i];
1024
1025                 s->time = le64_to_cpu(s->time);
1026                 s->val  = le64_to_cpu(s->val);
1027                 s->ddir = le32_to_cpu(s->ddir);
1028                 s->bs   = le32_to_cpu(s->bs);
1029         }
1030
1031 out:
1032         inflateEnd(&stream);
1033         return ret;
1034 }
1035
1036 int fio_handle_client(struct fio_client *client)
1037 {
1038         struct client_ops *ops = client->ops;
1039         struct fio_net_cmd *cmd;
1040
1041         dprint(FD_NET, "client: handle %s\n", client->hostname);
1042
1043         cmd = fio_net_recv_cmd(client->fd);
1044         if (!cmd)
1045                 return 0;
1046
1047         dprint(FD_NET, "client: got cmd op %s from %s (pdu=%u)\n",
1048                 fio_server_op(cmd->opcode), client->hostname, cmd->pdu_len);
1049
1050         switch (cmd->opcode) {
1051         case FIO_NET_CMD_QUIT:
1052                 if (ops->quit)
1053                         ops->quit(client, cmd);
1054                 remove_client(client);
1055                 free(cmd);
1056                 break;
1057         case FIO_NET_CMD_TEXT:
1058                 convert_text(cmd);
1059                 ops->text(client, cmd);
1060                 free(cmd);
1061                 break;
1062         case FIO_NET_CMD_DU: {
1063                 struct cmd_du_pdu *du = (struct cmd_du_pdu *) cmd->payload;
1064
1065                 convert_dus(&du->dus);
1066                 convert_agg(&du->agg);
1067
1068                 ops->disk_util(client, cmd);
1069                 free(cmd);
1070                 break;
1071                 }
1072         case FIO_NET_CMD_TS: {
1073                 struct cmd_ts_pdu *p = (struct cmd_ts_pdu *) cmd->payload;
1074
1075                 convert_ts(&p->ts, &p->ts);
1076                 convert_gs(&p->rs, &p->rs);
1077
1078                 ops->thread_status(client, cmd);
1079                 free(cmd);
1080                 break;
1081                 }
1082         case FIO_NET_CMD_GS: {
1083                 struct group_run_stats *gs = (struct group_run_stats *) cmd->payload;
1084
1085                 convert_gs(gs, gs);
1086
1087                 ops->group_stats(client, cmd);
1088                 free(cmd);
1089                 break;
1090                 }
1091         case FIO_NET_CMD_ETA: {
1092                 struct jobs_eta *je = (struct jobs_eta *) cmd->payload;
1093
1094                 remove_reply_cmd(client, cmd);
1095                 convert_jobs_eta(je);
1096                 handle_eta(client, cmd);
1097                 free(cmd);
1098                 break;
1099                 }
1100         case FIO_NET_CMD_PROBE:
1101                 remove_reply_cmd(client, cmd);
1102                 ops->probe(client, cmd);
1103                 free(cmd);
1104                 break;
1105         case FIO_NET_CMD_SERVER_START:
1106                 client->state = Client_running;
1107                 if (ops->job_start)
1108                         ops->job_start(client, cmd);
1109                 free(cmd);
1110                 break;
1111         case FIO_NET_CMD_START: {
1112                 struct cmd_start_pdu *pdu = (struct cmd_start_pdu *) cmd->payload;
1113
1114                 pdu->jobs = le32_to_cpu(pdu->jobs);
1115                 ops->start(client, cmd);
1116                 free(cmd);
1117                 break;
1118                 }
1119         case FIO_NET_CMD_STOP: {
1120                 struct cmd_end_pdu *pdu = (struct cmd_end_pdu *) cmd->payload;
1121
1122                 convert_stop(cmd);
1123                 client->state = Client_stopped;
1124                 client->error = le32_to_cpu(pdu->error);
1125                 client->signal = le32_to_cpu(pdu->signal);
1126                 ops->stop(client, cmd);
1127                 free(cmd);
1128                 break;
1129                 }
1130         case FIO_NET_CMD_ADD_JOB:
1131                 if (ops->add_job)
1132                         ops->add_job(client, cmd);
1133                 free(cmd);
1134                 break;
1135         case FIO_NET_CMD_IOLOG:
1136                 if (ops->iolog) {
1137                         struct cmd_iolog_pdu *pdu;
1138
1139                         pdu = convert_iolog(cmd);
1140                         ops->iolog(client, pdu);
1141                 }
1142                 free(cmd);
1143                 break;
1144         default:
1145                 log_err("fio: unknown client op: %s\n", fio_server_op(cmd->opcode));
1146                 free(cmd);
1147                 break;
1148         }
1149
1150         return 1;
1151 }
1152
1153 static void request_client_etas(struct client_ops *ops)
1154 {
1155         struct fio_client *client;
1156         struct flist_head *entry;
1157         struct client_eta *eta;
1158         int skipped = 0;
1159
1160         dprint(FD_NET, "client: request eta (%d)\n", nr_clients);
1161
1162         eta = malloc(sizeof(*eta));
1163         memset(&eta->eta, 0, sizeof(eta->eta));
1164         eta->pending = nr_clients;
1165
1166         flist_for_each(entry, &client_list) {
1167                 client = flist_entry(entry, struct fio_client, list);
1168
1169                 if (!flist_empty(&client->eta_list)) {
1170                         skipped++;
1171                         continue;
1172                 }
1173                 if (client->state != Client_running)
1174                         continue;
1175
1176                 assert(!client->eta_in_flight);
1177                 flist_add_tail(&client->eta_list, &eta_list);
1178                 client->eta_in_flight = eta;
1179                 fio_net_send_simple_cmd(client->fd, FIO_NET_CMD_SEND_ETA,
1180                                         (uintptr_t) eta, &client->cmd_list);
1181         }
1182
1183         while (skipped--)
1184                 fio_client_dec_jobs_eta(eta, ops->eta);
1185
1186         dprint(FD_NET, "client: requested eta tag %p\n", eta);
1187 }
1188
1189 static int client_check_cmd_timeout(struct fio_client *client,
1190                                     struct timeval *now)
1191 {
1192         struct fio_net_int_cmd *cmd;
1193         struct flist_head *entry, *tmp;
1194         int ret = 0;
1195
1196         flist_for_each_safe(entry, tmp, &client->cmd_list) {
1197                 cmd = flist_entry(entry, struct fio_net_int_cmd, list);
1198
1199                 if (mtime_since(&cmd->tv, now) < FIO_NET_CLIENT_TIMEOUT)
1200                         continue;
1201
1202                 log_err("fio: client %s, timeout on cmd %s\n", client->hostname,
1203                                                 fio_server_op(cmd->cmd.opcode));
1204                 flist_del(&cmd->list);
1205                 free(cmd);
1206                 ret = 1;
1207         }
1208
1209         return flist_empty(&client->cmd_list) && ret;
1210 }
1211
1212 static int fio_check_clients_timed_out(void)
1213 {
1214         struct fio_client *client;
1215         struct flist_head *entry, *tmp;
1216         struct timeval tv;
1217         int ret = 0;
1218
1219         gettimeofday(&tv, NULL);
1220
1221         flist_for_each_safe(entry, tmp, &client_list) {
1222                 client = flist_entry(entry, struct fio_client, list);
1223
1224                 if (flist_empty(&client->cmd_list))
1225                         continue;
1226
1227                 if (!client_check_cmd_timeout(client, &tv))
1228                         continue;
1229
1230                 if (client->ops->timed_out)
1231                         client->ops->timed_out(client);
1232                 else
1233                         log_err("fio: client %s timed out\n", client->hostname);
1234
1235                 remove_client(client);
1236                 ret = 1;
1237         }
1238
1239         return ret;
1240 }
1241
1242 int fio_handle_clients(struct client_ops *ops)
1243 {
1244         struct pollfd *pfds;
1245         int i, ret = 0, retval = 0;
1246
1247         gettimeofday(&eta_tv, NULL);
1248
1249         pfds = malloc(nr_clients * sizeof(struct pollfd));
1250
1251         sum_stat_clients = nr_clients;
1252         init_thread_stat(&client_ts);
1253         init_group_run_stat(&client_gs);
1254
1255         while (!exit_backend && nr_clients) {
1256                 struct flist_head *entry, *tmp;
1257                 struct fio_client *client;
1258
1259                 i = 0;
1260                 flist_for_each_safe(entry, tmp, &client_list) {
1261                         client = flist_entry(entry, struct fio_client, list);
1262
1263                         if (!client->sent_job && !client->ops->stay_connected &&
1264                             flist_empty(&client->cmd_list)) {
1265                                 remove_client(client);
1266                                 continue;
1267                         }
1268
1269                         pfds[i].fd = client->fd;
1270                         pfds[i].events = POLLIN;
1271                         i++;
1272                 }
1273
1274                 if (!nr_clients)
1275                         break;
1276
1277                 assert(i == nr_clients);
1278
1279                 do {
1280                         struct timeval tv;
1281
1282                         gettimeofday(&tv, NULL);
1283                         if (mtime_since(&eta_tv, &tv) >= ops->eta_msec) {
1284                                 request_client_etas(ops);
1285                                 memcpy(&eta_tv, &tv, sizeof(tv));
1286
1287                                 if (fio_check_clients_timed_out())
1288                                         break;
1289                         }
1290
1291                         ret = poll(pfds, nr_clients, 100);
1292                         if (ret < 0) {
1293                                 if (errno == EINTR)
1294                                         continue;
1295                                 log_err("fio: poll clients: %s\n", strerror(errno));
1296                                 break;
1297                         } else if (!ret)
1298                                 continue;
1299                 } while (ret <= 0);
1300
1301                 for (i = 0; i < nr_clients; i++) {
1302                         if (!(pfds[i].revents & POLLIN))
1303                                 continue;
1304
1305                         client = find_client_by_fd(pfds[i].fd);
1306                         if (!client) {
1307                                 log_err("fio: unknown client fd %d\n", pfds[i].fd);
1308                                 continue;
1309                         }
1310                         if (!fio_handle_client(client)) {
1311                                 log_info("client: host=%s disconnected\n",
1312                                                 client->hostname);
1313                                 remove_client(client);
1314                                 retval = 1;
1315                         } else if (client->error)
1316                                 retval = 1;
1317                         fio_put_client(client);
1318                 }
1319         }
1320
1321         free(pfds);
1322         return retval;
1323 }