blktrace 1.1.0
[blktrace.git] / blktrace.c
1 /*
2  * block queue tracing application
3  *
4  * Copyright (C) 2005 Jens Axboe <axboe@suse.de>
5  * Copyright (C) 2006 Jens Axboe <axboe@kernel.dk>
6  *
7  * Rewrite to have a single thread per CPU (managing all devices on that CPU)
8  *      Alan D. Brunelle <alan.brunelle@hp.com> - January 2009
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *
20  *  You should have received a copy of the GNU General Public License
21  *  along with this program; if not, write to the Free Software
22  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23  *
24  */
25
26 #include <errno.h>
27 #include <stdarg.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <fcntl.h>
32 #include <getopt.h>
33 #include <sched.h>
34 #include <unistd.h>
35 #include <poll.h>
36 #include <signal.h>
37 #include <pthread.h>
38 #include <locale.h>
39 #include <sys/ioctl.h>
40 #include <sys/types.h>
41 #include <sys/stat.h>
42 #include <sys/vfs.h>
43 #include <sys/mman.h>
44 #include <sys/param.h>
45 #include <sys/time.h>
46 #include <sys/resource.h>
47 #include <sys/socket.h>
48 #include <netinet/in.h>
49 #include <arpa/inet.h>
50 #include <netdb.h>
51 #include <sys/sendfile.h>
52
53 #include "btt/list.h"
54 #include "blktrace.h"
55
56 /*
57  * You may want to increase this even more, if you are logging at a high
58  * rate and see skipped/missed events
59  */
60 #define BUF_SIZE                (512 * 1024)
61 #define BUF_NR                  (4)
62
63 #define FILE_VBUF_SIZE          (128 * 1024)
64
65 #define DEBUGFS_TYPE            (0x64626720)
66 #define TRACE_NET_PORT          (8462)
67
68 enum {
69         Net_none = 0,
70         Net_server,
71         Net_client,
72 };
73
74 enum thread_status {
75         Th_running,
76         Th_leaving,
77         Th_error
78 };
79
80 /*
81  * Generic stats collected: nevents can be _roughly_ estimated by data_read
82  * (discounting pdu...)
83  *
84  * These fields are updated w/ pdc_dr_update & pdc_nev_update below.
85  */
86 struct pdc_stats {
87         unsigned long long data_read;
88         unsigned long long nevents;
89 };
90
91 struct devpath {
92         struct list_head head;
93         char *path;                     /* path to device special file */
94         char *buts_name;                /* name returned from bt kernel code */
95         struct pdc_stats *stats;
96         int fd, ncpus;
97         unsigned long long drops;
98
99         /*
100          * For piped output only:
101          *
102          * Each tracer will have a tracer_devpath_head that it will add new
103          * data onto. It's list is protected above (tracer_devpath_head.mutex)
104          * and it will signal the processing thread using the dp_cond,
105          * dp_mutex & dp_entries variables above.
106          */
107         struct tracer_devpath_head *heads;
108
109         /*
110          * For network server mode only:
111          */
112         struct cl_host *ch;
113         u32 cl_id;
114         time_t cl_connect_time;
115         struct io_info *ios;
116 };
117
118 /*
119  * For piped output to stdout we will have each tracer thread (one per dev)
120  * tack buffers read from the relay queues on a per-device list.
121  *
122  * The main thread will then collect trace buffers from each of lists in turn.
123  *
124  * We will use a mutex to guard each of the trace_buf list. The tracers
125  * can then signal the main thread using <dp_cond,dp_mutex> and
126  * dp_entries. (When dp_entries is 0, and a tracer adds an entry it will
127  * signal. When dp_entries is 0, the main thread will wait for that condition
128  * to be signalled.)
129  *
130  * adb: It may be better just to have a large buffer per tracer per dev,
131  * and then use it as a ring-buffer. This would certainly cut down a lot
132  * of malloc/free thrashing, at the cost of more memory movements (potentially).
133  */
134 struct trace_buf {
135         struct list_head head;
136         struct devpath *dpp;
137         void *buf;
138         int cpu, len;
139 };
140
141 struct tracer_devpath_head {
142         pthread_mutex_t mutex;
143         struct list_head head;
144         struct trace_buf *prev;
145 };
146
147 /*
148  * Used to handle the mmap() interfaces for output file (containing traces)
149  */
150 struct mmap_info {
151         void *fs_buf;
152         unsigned long long fs_size, fs_max_size, fs_off, fs_buf_len;
153         unsigned long buf_size, buf_nr;
154         int pagesize;
155 };
156
157 /*
158  * Each thread doing work on a (client) side of blktrace will have one
159  * of these. The ios array contains input/output information, pfds holds
160  * poll() data. The volatile's provide flags to/from the main executing
161  * thread.
162  */
163 struct tracer {
164         struct list_head head;
165         struct io_info *ios;
166         struct pollfd *pfds;
167         pthread_t thread;
168         int cpu, nios;
169         volatile int status, is_done;
170 };
171
172 /*
173  * networking stuff follows. we include a magic number so we know whether
174  * to endianness convert or not.
175  *
176  * The len field is overloaded:
177  *      0 - Indicates an "open" - allowing the server to set up for a dev/cpu
178  *      1 - Indicates a "close" - Shut down connection orderly
179  *
180  * The cpu field is overloaded on close: it will contain the number of drops.
181  */
182 struct blktrace_net_hdr {
183         u32 magic;              /* same as trace magic */
184         char buts_name[32];     /* trace name */
185         u32 cpu;                /* for which cpu */
186         u32 max_cpus;
187         u32 len;                /* length of following trace data */
188         u32 cl_id;              /* id for set of client per-cpu connections */
189         u32 buf_size;           /* client buf_size for this trace  */
190         u32 buf_nr;             /* client buf_nr for this trace  */
191         u32 page_size;          /* client page_size for this trace  */
192 };
193
194 /*
195  * Each host encountered has one of these. The head is used to link this
196  * on to the network server's ch_list. Connections associated with this
197  * host are linked on conn_list, and any devices traced on that host
198  * are connected on the devpaths list.
199  */
200 struct cl_host {
201         struct list_head head;
202         struct list_head conn_list;
203         struct list_head devpaths;
204         struct net_server_s *ns;
205         char *hostname;
206         struct in_addr cl_in_addr;
207         int connects, ndevs, cl_opens;
208 };
209
210 /*
211  * Each connection (client to server socket ('fd')) has one of these. A
212  * back reference to the host ('ch'), and lists headers (for the host
213  * list, and the network server conn_list) are also included.
214  */
215 struct cl_conn {
216         struct list_head ch_head, ns_head;
217         struct cl_host *ch;
218         int fd, ncpus;
219         time_t connect_time;
220 };
221
222 /*
223  * The network server requires some poll structures to be maintained -
224  * one per conection currently on conn_list. The nchs/ch_list values
225  * are for each host connected to this server. The addr field is used
226  * for scratch as new connections are established.
227  */
228 struct net_server_s {
229         struct list_head conn_list;
230         struct list_head ch_list;
231         struct pollfd *pfds;
232         int listen_fd, connects, nchs;
233         struct sockaddr_in addr;
234 };
235
236 /*
237  * This structure is (generically) used to providide information
238  * for a read-to-write set of values.
239  *
240  * ifn & ifd represent input information
241  *
242  * ofn, ofd, ofp, obuf & mmap_info are used for output file (optionally).
243  */
244 struct io_info {
245         struct devpath *dpp;
246         FILE *ofp;
247         char *obuf;
248         struct cl_conn *nc;     /* Server network connection */
249
250         /*
251          * mmap controlled output files
252          */
253         struct mmap_info mmap_info;
254
255         /*
256          * Client network fields
257          */
258         unsigned int ready;
259         unsigned long long data_queued;
260
261         /*
262          * Input/output file descriptors & names
263          */
264         int ifd, ofd;
265         char ifn[MAXPATHLEN + 64];
266         char ofn[MAXPATHLEN + 64];
267 };
268
269 static char blktrace_version[] = "2.0.0";
270
271 /*
272  * Linkage to blktrace helper routines (trace conversions)
273  */
274 int data_is_native = -1;
275
276 static int ndevs;
277 static int ncpus;
278 static int pagesize;
279 static int act_mask = ~0U;
280 static int kill_running_trace;
281 static int stop_watch;
282 static int piped_output;
283
284 static char *debugfs_path = "/sys/kernel/debug";
285 static char *output_name;
286 static char *output_dir;
287
288 static unsigned long buf_size = BUF_SIZE;
289 static unsigned long buf_nr = BUF_NR;
290
291 static FILE *pfp;
292
293 static LIST_HEAD(devpaths);
294 static LIST_HEAD(tracers);
295
296 static volatile int done;
297
298 /*
299  * tracer threads add entries, the main thread takes them off and processes
300  * them. These protect the dp_entries variable.
301  */
302 static pthread_cond_t dp_cond = PTHREAD_COND_INITIALIZER;
303 static pthread_mutex_t dp_mutex = PTHREAD_MUTEX_INITIALIZER;
304 static volatile int dp_entries;
305
306 /*
307  * These synchronize master / thread interactions.
308  */
309 static pthread_cond_t mt_cond = PTHREAD_COND_INITIALIZER;
310 static pthread_mutex_t mt_mutex = PTHREAD_MUTEX_INITIALIZER;
311 static volatile int nthreads_running;
312 static volatile int nthreads_leaving;
313 static volatile int nthreads_error;
314 static volatile int tracers_run;
315
316 /*
317  * network cmd line params
318  */
319 static struct sockaddr_in hostname_addr;
320 static char hostname[MAXHOSTNAMELEN];
321 static int net_port = TRACE_NET_PORT;
322 static int net_use_sendfile = 1;
323 static int net_mode;
324 static int *cl_fds;
325
326 static int (*handle_pfds)(struct tracer *, int, int);
327 static int (*handle_list)(struct tracer_devpath_head *, struct list_head *);
328
329 #define S_OPTS  "d:a:A:r:o:kw:vVb:n:D:lh:p:sI:"
330 static struct option l_opts[] = {
331         {
332                 .name = "dev",
333                 .has_arg = required_argument,
334                 .flag = NULL,
335                 .val = 'd'
336         },
337         {
338                 .name = "input-devs",
339                 .has_arg = required_argument,
340                 .flag = NULL,
341                 .val = 'I'
342         },
343         {
344                 .name = "act-mask",
345                 .has_arg = required_argument,
346                 .flag = NULL,
347                 .val = 'a'
348         },
349         {
350                 .name = "set-mask",
351                 .has_arg = required_argument,
352                 .flag = NULL,
353                 .val = 'A'
354         },
355         {
356                 .name = "relay",
357                 .has_arg = required_argument,
358                 .flag = NULL,
359                 .val = 'r'
360         },
361         {
362                 .name = "output",
363                 .has_arg = required_argument,
364                 .flag = NULL,
365                 .val = 'o'
366         },
367         {
368                 .name = "kill",
369                 .has_arg = no_argument,
370                 .flag = NULL,
371                 .val = 'k'
372         },
373         {
374                 .name = "stopwatch",
375                 .has_arg = required_argument,
376                 .flag = NULL,
377                 .val = 'w'
378         },
379         {
380                 .name = "version",
381                 .has_arg = no_argument,
382                 .flag = NULL,
383                 .val = 'v'
384         },
385         {
386                 .name = "version",
387                 .has_arg = no_argument,
388                 .flag = NULL,
389                 .val = 'V'
390         },
391         {
392                 .name = "buffer-size",
393                 .has_arg = required_argument,
394                 .flag = NULL,
395                 .val = 'b'
396         },
397         {
398                 .name = "num-sub-buffers",
399                 .has_arg = required_argument,
400                 .flag = NULL,
401                 .val = 'n'
402         },
403         {
404                 .name = "output-dir",
405                 .has_arg = required_argument,
406                 .flag = NULL,
407                 .val = 'D'
408         },
409         {
410                 .name = "listen",
411                 .has_arg = no_argument,
412                 .flag = NULL,
413                 .val = 'l'
414         },
415         {
416                 .name = "host",
417                 .has_arg = required_argument,
418                 .flag = NULL,
419                 .val = 'h'
420         },
421         {
422                 .name = "port",
423                 .has_arg = required_argument,
424                 .flag = NULL,
425                 .val = 'p'
426         },
427         {
428                 .name = "no-sendfile",
429                 .has_arg = no_argument,
430                 .flag = NULL,
431                 .val = 's'
432         },
433         {
434                 .name = NULL,
435         }
436 };
437
438 static char usage_str[] = "\n\n" \
439         "-d <dev>             | --dev=<dev>\n" \
440         "[ -r <debugfs path>  | --relay=<debugfs path> ]\n" \
441         "[ -o <file>          | --output=<file>]\n" \
442         "[ -D <dir>           | --output-dir=<dir>\n" \
443         "[ -w <time>          | --stopwatch=<time>]\n" \
444         "[ -a <action field>  | --act-mask=<action field>]\n" \
445         "[ -A <action mask>   | --set-mask=<action mask>]\n" \
446         "[ -b <size>          | --buffer-size]\n" \
447         "[ -n <number>        | --num-sub-buffers=<number>]\n" \
448         "[ -l                 | --listen]\n" \
449         "[ -h <hostname>      | --host=<hostname>]\n" \
450         "[ -p <port number>   | --port=<port number>]\n" \
451         "[ -s                 | --no-sendfile]\n" \
452         "[ -I <devs file>     | --input-devs=<devs file>]\n" \
453         "[ -v <version>       | --version]\n" \
454         "[ -V <version>       | --version]\n" \
455
456         "\t-d Use specified device. May also be given last after options\n" \
457         "\t-r Path to mounted debugfs, defaults to /sys/kernel/debug\n" \
458         "\t-o File(s) to send output to\n" \
459         "\t-D Directory to prepend to output file names\n" \
460         "\t-w Stop after defined time, in seconds\n" \
461         "\t-a Only trace specified actions. See documentation\n" \
462         "\t-A Give trace mask as a single value. See documentation\n" \
463         "\t-b Sub buffer size in KiB (default 512)\n" \
464         "\t-n Number of sub buffers (default 4)\n" \
465         "\t-l Run in network listen mode (blktrace server)\n" \
466         "\t-h Run in network client mode, connecting to the given host\n" \
467         "\t-p Network port to use (default 8462)\n" \
468         "\t-s Make the network client NOT use sendfile() to transfer data\n" \
469         "\t-I Add devices found in <devs file>\n" \
470         "\t-v Print program version info\n" \
471         "\t-V Print program version info\n\n";
472
473 static void clear_events(struct pollfd *pfd)
474 {
475         pfd->events = 0;
476         pfd->revents = 0;
477 }
478
479 static inline int net_client_use_sendfile(void)
480 {
481         return net_mode == Net_client && net_use_sendfile;
482 }
483
484 static inline int net_client_use_send(void)
485 {
486         return net_mode == Net_client && !net_use_sendfile;
487 }
488
489 static inline int use_tracer_devpaths(void)
490 {
491         return piped_output || net_client_use_send();
492 }
493
494 static inline int in_addr_eq(struct in_addr a, struct in_addr b)
495 {
496         return a.s_addr == b.s_addr;
497 }
498
499 static inline void pdc_dr_update(struct devpath *dpp, int cpu, int data_read)
500 {
501         dpp->stats[cpu].data_read += data_read;
502 }
503
504 static inline void pdc_nev_update(struct devpath *dpp, int cpu, int nevents)
505 {
506         dpp->stats[cpu].nevents += nevents;
507 }
508
509 static void show_usage(char *prog)
510 {
511         fprintf(stderr, "Usage: %s %s", prog, usage_str);
512 }
513
514 /*
515  * Create a timespec 'msec' milliseconds into the future
516  */
517 static inline void make_timespec(struct timespec *tsp, long delta_msec)
518 {
519         struct timeval now;
520
521         gettimeofday(&now, NULL);
522         tsp->tv_sec = now.tv_sec;
523         tsp->tv_nsec = 1000L * now.tv_usec;
524
525         tsp->tv_nsec += (delta_msec * 1000000L);
526         if (tsp->tv_nsec > 1000000000L) {
527                 long secs = tsp->tv_nsec / 1000000000L;
528
529                 tsp->tv_sec += secs;
530                 tsp->tv_nsec -= (secs * 1000000000L);
531         }
532 }
533
534 /*
535  * Add a timer to ensure wait ends
536  */
537 static void t_pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
538 {
539         struct timespec ts;
540
541         make_timespec(&ts, 50);
542         pthread_cond_timedwait(cond, mutex, &ts);
543 }
544
545 static void unblock_tracers(void)
546 {
547         pthread_mutex_lock(&mt_mutex);
548         tracers_run = 1;
549         pthread_cond_broadcast(&mt_cond);
550         pthread_mutex_unlock(&mt_mutex);
551 }
552
553 static void tracer_wait_unblock(struct tracer *tp)
554 {
555         pthread_mutex_lock(&mt_mutex);
556         while (!tp->is_done && !tracers_run)
557                 pthread_cond_wait(&mt_cond, &mt_mutex);
558         pthread_mutex_unlock(&mt_mutex);
559 }
560
561 static void tracer_signal_ready(struct tracer *tp,
562                                 enum thread_status th_status,
563                                 int status)
564 {
565         pthread_mutex_lock(&mt_mutex);
566         tp->status = status;
567
568         if (th_status == Th_running)
569                 nthreads_running++;
570         else if (th_status == Th_error)
571                 nthreads_error++;
572         else
573                 nthreads_leaving++;
574
575         pthread_cond_signal(&mt_cond);
576         pthread_mutex_unlock(&mt_mutex);
577 }
578
579 static void wait_tracers_ready(int ncpus_started)
580 {
581         pthread_mutex_lock(&mt_mutex);
582         while ((nthreads_running + nthreads_error) < ncpus_started)
583                 t_pthread_cond_wait(&mt_cond, &mt_mutex);
584         pthread_mutex_unlock(&mt_mutex);
585 }
586
587 static void wait_tracers_leaving(void)
588 {
589         pthread_mutex_lock(&mt_mutex);
590         while (nthreads_leaving < nthreads_running)
591                 t_pthread_cond_wait(&mt_cond, &mt_mutex);
592         pthread_mutex_unlock(&mt_mutex);
593 }
594
595 static void init_mmap_info(struct mmap_info *mip)
596 {
597         mip->buf_size = buf_size;
598         mip->buf_nr = buf_nr;
599         mip->pagesize = pagesize;
600 }
601
602 static void net_close_connection(int *fd)
603 {
604         shutdown(*fd, SHUT_RDWR);
605         close(*fd);
606         *fd = -1;
607 }
608
609 static void dpp_free(struct devpath *dpp)
610 {
611         if (dpp->stats)
612                 free(dpp->stats);
613         if (dpp->ios)
614                 free(dpp->ios);
615         if (dpp->path)
616                 free(dpp->path);
617         if (dpp->buts_name)
618                 free(dpp->buts_name);
619         free(dpp);
620 }
621
622 static int lock_on_cpu(int cpu)
623 {
624         cpu_set_t * cpu_mask;
625         size_t size;
626         cpu_mask = CPU_ALLOC(ncpus);
627         size = CPU_ALLOC_SIZE(ncpus);
628
629         CPU_ZERO_S(size, cpu_mask);
630         CPU_SET_S(cpu, size, cpu_mask);
631         if (sched_setaffinity(0, size, cpu_mask) < 0) {
632                 CPU_FREE(cpu_mask);             
633                 return errno;
634         }
635
636         CPU_FREE(cpu_mask);             
637         return 0;
638 }
639
640 static int increase_limit(int resource, rlim_t increase)
641 {
642         struct rlimit rlim;
643         int save_errno = errno;
644
645         if (!getrlimit(resource, &rlim)) {
646                 rlim.rlim_cur += increase;
647                 if (rlim.rlim_cur >= rlim.rlim_max)
648                         rlim.rlim_max = rlim.rlim_cur + increase;
649
650                 if (!setrlimit(resource, &rlim))
651                         return 1;
652         }
653
654         errno = save_errno;
655         return 0;
656 }
657
658 static int handle_open_failure(void)
659 {
660         if (errno == ENFILE || errno == EMFILE)
661                 return increase_limit(RLIMIT_NOFILE, 16);
662         return 0;
663 }
664
665 static int handle_mem_failure(size_t length)
666 {
667         if (errno == ENFILE)
668                 return handle_open_failure();
669         else if (errno == ENOMEM)
670                 return increase_limit(RLIMIT_MEMLOCK, 2 * length);
671         return 0;
672 }
673
674 static FILE *my_fopen(const char *path, const char *mode)
675 {
676         FILE *fp;
677
678         do {
679                 fp = fopen(path, mode);
680         } while (fp == NULL && handle_open_failure());
681
682         return fp;
683 }
684
685 static int my_open(const char *path, int flags)
686 {
687         int fd;
688
689         do {
690                 fd = open(path, flags);
691         } while (fd < 0 && handle_open_failure());
692
693         return fd;
694 }
695
696 static int my_socket(int domain, int type, int protocol)
697 {
698         int fd;
699
700         do {
701                 fd = socket(domain, type, protocol);
702         } while (fd < 0 && handle_open_failure());
703
704         return fd;
705 }
706
707 static int my_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen)
708 {
709         int fd;
710
711         do {
712                 fd = accept(sockfd, addr, addrlen);
713         } while (fd < 0 && handle_open_failure());
714
715         return fd;
716 }
717
718 static void *my_mmap(void *addr, size_t length, int prot, int flags, int fd,
719                      off_t offset)
720 {
721         void *new;
722
723         do {
724                 new = mmap(addr, length, prot, flags, fd, offset);
725         } while (new == MAP_FAILED && handle_mem_failure(length));
726
727         return new;
728 }
729
730 static int my_mlock(struct tracer *tp,
731                     const void *addr, size_t len)
732 {
733         int ret, retry = 0;
734
735         do {
736                 ret = mlock(addr, len);
737                 if ((retry >= 10) && tp && tp->is_done)
738                         break;
739                 retry++;
740         } while (ret < 0 && handle_mem_failure(len));
741
742         return ret;
743 }
744
745 static int setup_mmap(int fd, unsigned int maxlen,
746                       struct mmap_info *mip,
747                       struct tracer *tp)
748 {
749         if (mip->fs_off + maxlen > mip->fs_buf_len) {
750                 unsigned long nr = max(16, mip->buf_nr);
751
752                 if (mip->fs_buf) {
753                         munlock(mip->fs_buf, mip->fs_buf_len);
754                         munmap(mip->fs_buf, mip->fs_buf_len);
755                         mip->fs_buf = NULL;
756                 }
757
758                 mip->fs_off = mip->fs_size & (mip->pagesize - 1);
759                 mip->fs_buf_len = (nr * mip->buf_size) - mip->fs_off;
760                 mip->fs_max_size += mip->fs_buf_len;
761
762                 if (ftruncate(fd, mip->fs_max_size) < 0) {
763                         perror("setup_mmap: ftruncate");
764                         return 1;
765                 }
766
767                 mip->fs_buf = my_mmap(NULL, mip->fs_buf_len, PROT_WRITE,
768                                       MAP_SHARED, fd,
769                                       mip->fs_size - mip->fs_off);
770                 if (mip->fs_buf == MAP_FAILED) {
771                         perror("setup_mmap: mmap");
772                         return 1;
773                 }
774                 if (my_mlock(tp, mip->fs_buf, mip->fs_buf_len) < 0) {
775                         perror("setup_mlock: mlock");
776                         return 1;
777                 }
778         }
779
780         return 0;
781 }
782
783 static int __stop_trace(int fd)
784 {
785         /*
786          * Should be stopped, don't complain if it isn't
787          */
788         ioctl(fd, BLKTRACESTOP);
789         return ioctl(fd, BLKTRACETEARDOWN);
790 }
791
792 static int write_data(char *buf, int len)
793 {
794         int ret;
795
796 rewrite:
797         ret = fwrite(buf, len, 1, pfp);
798         if (ferror(pfp) || ret != 1) {
799                 if (errno == EINTR) {
800                         clearerr(pfp);
801                         goto rewrite;
802                 }
803
804                 if (!piped_output || (errno != EPIPE && errno != EBADF)) {
805                         fprintf(stderr, "write(%d) failed: %d/%s\n",
806                                 len, errno, strerror(errno));
807                 }
808                 goto err;
809         }
810
811         fflush(pfp);
812         return 0;
813
814 err:
815         clearerr(pfp);
816         return 1;
817 }
818
819 /*
820  * Returns the number of bytes read (successfully)
821  */
822 static int __net_recv_data(int fd, void *buf, unsigned int len)
823 {
824         unsigned int bytes_left = len;
825
826         while (bytes_left && !done) {
827                 int ret = recv(fd, buf, bytes_left, MSG_WAITALL);
828
829                 if (ret == 0)
830                         break;
831                 else if (ret < 0) {
832                         if (errno == EAGAIN) {
833                                 usleep(50);
834                                 continue;
835                         }
836                         perror("server: net_recv_data: recv failed");
837                         break;
838                 } else {
839                         buf += ret;
840                         bytes_left -= ret;
841                 }
842         }
843
844         return len - bytes_left;
845 }
846
847 static int net_recv_data(int fd, void *buf, unsigned int len)
848 {
849         return __net_recv_data(fd, buf, len);
850 }
851
852 /*
853  * Returns number of bytes written
854  */
855 static int net_send_data(int fd, void *buf, unsigned int buf_len)
856 {
857         int ret;
858         unsigned int bytes_left = buf_len;
859
860         while (bytes_left) {
861                 ret = send(fd, buf, bytes_left, 0);
862                 if (ret < 0) {
863                         perror("send");
864                         break;
865                 }
866
867                 buf += ret;
868                 bytes_left -= ret;
869         }
870
871         return buf_len - bytes_left;
872 }
873
874 static int net_send_header(int fd, int cpu, char *buts_name, int len)
875 {
876         struct blktrace_net_hdr hdr;
877
878         memset(&hdr, 0, sizeof(hdr));
879
880         hdr.magic = BLK_IO_TRACE_MAGIC;
881         memset(hdr.buts_name, 0, sizeof(hdr.buts_name));
882         strncpy(hdr.buts_name, buts_name, sizeof(hdr.buts_name));
883         hdr.buts_name[sizeof(hdr.buts_name) - 1] = '\0';
884         hdr.cpu = cpu;
885         hdr.max_cpus = ncpus;
886         hdr.len = len;
887         hdr.cl_id = getpid();
888         hdr.buf_size = buf_size;
889         hdr.buf_nr = buf_nr;
890         hdr.page_size = pagesize;
891
892         return net_send_data(fd, &hdr, sizeof(hdr)) != sizeof(hdr);
893 }
894
895 static void net_send_open_close(int fd, int cpu, char *buts_name, int len)
896 {
897         struct blktrace_net_hdr ret_hdr;
898
899         net_send_header(fd, cpu, buts_name, len);
900         net_recv_data(fd, &ret_hdr, sizeof(ret_hdr));
901 }
902
903 static void net_send_open(int fd, int cpu, char *buts_name)
904 {
905         net_send_open_close(fd, cpu, buts_name, 0);
906 }
907
908 static void net_send_close(int fd, char *buts_name, int drops)
909 {
910         /*
911          * Overload CPU w/ number of drops
912          *
913          * XXX: Need to clear/set done around call - done=1 (which
914          * is true here) stops reads from happening... :-(
915          */
916         done = 0;
917         net_send_open_close(fd, drops, buts_name, 1);
918         done = 1;
919 }
920
921 static void ack_open_close(int fd, char *buts_name)
922 {
923         net_send_header(fd, 0, buts_name, 2);
924 }
925
926 static void net_send_drops(int fd)
927 {
928         struct list_head *p;
929
930         __list_for_each(p, &devpaths) {
931                 struct devpath *dpp = list_entry(p, struct devpath, head);
932
933                 net_send_close(fd, dpp->buts_name, dpp->drops);
934         }
935 }
936
937 /*
938  * Returns:
939  *       0: "EOF"
940  *       1: OK
941  *      -1: Error
942  */
943 static int net_get_header(struct cl_conn *nc, struct blktrace_net_hdr *bnh)
944 {
945         int bytes_read;
946         int fl = fcntl(nc->fd, F_GETFL);
947
948         fcntl(nc->fd, F_SETFL, fl | O_NONBLOCK);
949         bytes_read = __net_recv_data(nc->fd, bnh, sizeof(*bnh));
950         fcntl(nc->fd, F_SETFL, fl & ~O_NONBLOCK);
951
952         if (bytes_read == sizeof(*bnh))
953                 return 1;
954         else if (bytes_read == 0)
955                 return 0;
956         else
957                 return -1;
958 }
959
960 static int net_setup_addr(void)
961 {
962         struct sockaddr_in *addr = &hostname_addr;
963
964         memset(addr, 0, sizeof(*addr));
965         addr->sin_family = AF_INET;
966         addr->sin_port = htons(net_port);
967
968         if (inet_aton(hostname, &addr->sin_addr) != 1) {
969                 struct hostent *hent;
970 retry:
971                 hent = gethostbyname(hostname);
972                 if (!hent) {
973                         if (h_errno == TRY_AGAIN) {
974                                 usleep(100);
975                                 goto retry;
976                         } else if (h_errno == NO_RECOVERY) {
977                                 fprintf(stderr, "gethostbyname(%s)"
978                                         "non-recoverable error encountered\n",
979                                         hostname);
980                         } else {
981                                 /*
982                                  * HOST_NOT_FOUND, NO_ADDRESS or NO_DATA
983                                  */
984                                 fprintf(stderr, "Host %s not found\n",
985                                         hostname);
986                         }
987                         return 1;
988                 }
989
990                 memcpy(&addr->sin_addr, hent->h_addr, 4);
991                 memset(hostname, 0, sizeof(hostname));
992                 strncpy(hostname, hent->h_name, sizeof(hostname));
993                 hostname[sizeof(hostname) - 1] = '\0';
994         }
995
996         return 0;
997 }
998
999 static int net_setup_client(void)
1000 {
1001         int fd;
1002         struct sockaddr_in *addr = &hostname_addr;
1003
1004         fd = my_socket(AF_INET, SOCK_STREAM, 0);
1005         if (fd < 0) {
1006                 perror("client: socket");
1007                 return -1;
1008         }
1009
1010         if (connect(fd, (struct sockaddr *)addr, sizeof(*addr)) < 0) {
1011                 if (errno == ECONNREFUSED)
1012                         fprintf(stderr,
1013                                 "\nclient: Connection to %s refused, "
1014                                 "perhaps the server is not started?\n\n",
1015                                 hostname);
1016                 else
1017                         perror("client: connect");
1018
1019                 close(fd);
1020                 return -1;
1021         }
1022
1023         return fd;
1024 }
1025
1026 static int open_client_connections(void)
1027 {
1028         int cpu;
1029
1030         cl_fds = calloc(ncpus, sizeof(*cl_fds));
1031         for (cpu = 0; cpu < ncpus; cpu++) {
1032                 cl_fds[cpu] = net_setup_client();
1033                 if (cl_fds[cpu] < 0)
1034                         goto err;
1035         }
1036         return 0;
1037
1038 err:
1039         while (cpu > 0)
1040                 close(cl_fds[cpu--]);
1041         free(cl_fds);
1042         return 1;
1043 }
1044
1045 static void close_client_connections(void)
1046 {
1047         if (cl_fds) {
1048                 int cpu, *fdp;
1049
1050                 for (cpu = 0, fdp = cl_fds; cpu < ncpus; cpu++, fdp++) {
1051                         if (*fdp >= 0) {
1052                                 net_send_drops(*fdp);
1053                                 net_close_connection(fdp);
1054                         }
1055                 }
1056                 free(cl_fds);
1057         }
1058 }
1059
1060 static void setup_buts(void)
1061 {
1062         struct list_head *p;
1063
1064         __list_for_each(p, &devpaths) {
1065                 struct blk_user_trace_setup buts;
1066                 struct devpath *dpp = list_entry(p, struct devpath, head);
1067
1068                 memset(&buts, 0, sizeof(buts));
1069                 buts.buf_size = buf_size;
1070                 buts.buf_nr = buf_nr;
1071                 buts.act_mask = act_mask;
1072
1073                 if (ioctl(dpp->fd, BLKTRACESETUP, &buts) >= 0) {
1074                         dpp->ncpus = ncpus;
1075                         dpp->buts_name = strdup(buts.name);
1076                         if (dpp->stats)
1077                                 free(dpp->stats);
1078                         dpp->stats = calloc(dpp->ncpus, sizeof(*dpp->stats));
1079                         memset(dpp->stats, 0, dpp->ncpus * sizeof(*dpp->stats));
1080                 } else
1081                         fprintf(stderr, "BLKTRACESETUP(2) %s failed: %d/%s\n",
1082                                 dpp->path, errno, strerror(errno));
1083         }
1084 }
1085
1086 static void start_buts(void)
1087 {
1088         struct list_head *p;
1089
1090         __list_for_each(p, &devpaths) {
1091                 struct devpath *dpp = list_entry(p, struct devpath, head);
1092
1093                 if (ioctl(dpp->fd, BLKTRACESTART) < 0) {
1094                         fprintf(stderr, "BLKTRACESTART %s failed: %d/%s\n",
1095                                 dpp->path, errno, strerror(errno));
1096                 }
1097         }
1098 }
1099
1100 static int get_drops(struct devpath *dpp)
1101 {
1102         int fd, drops = 0;
1103         char fn[MAXPATHLEN + 64], tmp[256];
1104
1105         snprintf(fn, sizeof(fn), "%s/block/%s/dropped", debugfs_path,
1106                  dpp->buts_name);
1107
1108         fd = my_open(fn, O_RDONLY);
1109         if (fd < 0) {
1110                 /*
1111                  * This may be ok: the kernel may not support
1112                  * dropped counts.
1113                  */
1114                 if (errno != ENOENT)
1115                         fprintf(stderr, "Could not open %s: %d/%s\n",
1116                                 fn, errno, strerror(errno));
1117                 return 0;
1118         } else if (read(fd, tmp, sizeof(tmp)) < 0) {
1119                 fprintf(stderr, "Could not read %s: %d/%s\n",
1120                         fn, errno, strerror(errno));
1121         } else
1122                 drops = atoi(tmp);
1123         close(fd);
1124
1125         return drops;
1126 }
1127
1128 static void get_all_drops(void)
1129 {
1130         struct list_head *p;
1131
1132         __list_for_each(p, &devpaths) {
1133                 struct devpath *dpp = list_entry(p, struct devpath, head);
1134
1135                 dpp->drops = get_drops(dpp);
1136         }
1137 }
1138
1139 static inline struct trace_buf *alloc_trace_buf(int cpu, int bufsize)
1140 {
1141         struct trace_buf *tbp;
1142
1143         tbp = malloc(sizeof(*tbp) + bufsize);
1144         INIT_LIST_HEAD(&tbp->head);
1145         tbp->len = 0;
1146         tbp->buf = (void *)(tbp + 1);
1147         tbp->cpu = cpu;
1148         tbp->dpp = NULL;        /* Will be set when tbp is added */
1149
1150         return tbp;
1151 }
1152
1153 static void free_tracer_heads(struct devpath *dpp)
1154 {
1155         int cpu;
1156         struct tracer_devpath_head *hd;
1157
1158         for (cpu = 0, hd = dpp->heads; cpu < ncpus; cpu++, hd++) {
1159                 if (hd->prev)
1160                         free(hd->prev);
1161
1162                 pthread_mutex_destroy(&hd->mutex);
1163         }
1164         free(dpp->heads);
1165 }
1166
1167 static int setup_tracer_devpaths(void)
1168 {
1169         struct list_head *p;
1170
1171         if (net_client_use_send())
1172                 if (open_client_connections())
1173                         return 1;
1174
1175         __list_for_each(p, &devpaths) {
1176                 int cpu;
1177                 struct tracer_devpath_head *hd;
1178                 struct devpath *dpp = list_entry(p, struct devpath, head);
1179
1180                 dpp->heads = calloc(ncpus, sizeof(struct tracer_devpath_head));
1181                 for (cpu = 0, hd = dpp->heads; cpu < ncpus; cpu++, hd++) {
1182                         INIT_LIST_HEAD(&hd->head);
1183                         pthread_mutex_init(&hd->mutex, NULL);
1184                         hd->prev = NULL;
1185                 }
1186         }
1187
1188         return 0;
1189 }
1190
1191 static inline void add_trace_buf(struct devpath *dpp, int cpu,
1192                                                 struct trace_buf **tbpp)
1193 {
1194         struct trace_buf *tbp = *tbpp;
1195         struct tracer_devpath_head *hd = &dpp->heads[cpu];
1196
1197         tbp->dpp = dpp;
1198
1199         pthread_mutex_lock(&hd->mutex);
1200         list_add_tail(&tbp->head, &hd->head);
1201         pthread_mutex_unlock(&hd->mutex);
1202
1203         *tbpp = alloc_trace_buf(cpu, buf_size);
1204 }
1205
1206 static inline void incr_entries(int entries_handled)
1207 {
1208         pthread_mutex_lock(&dp_mutex);
1209         if (dp_entries == 0)
1210                 pthread_cond_signal(&dp_cond);
1211         dp_entries += entries_handled;
1212         pthread_mutex_unlock(&dp_mutex);
1213 }
1214
1215 static void decr_entries(int handled)
1216 {
1217         pthread_mutex_lock(&dp_mutex);
1218         dp_entries -= handled;
1219         pthread_mutex_unlock(&dp_mutex);
1220 }
1221
1222 static int wait_empty_entries(void)
1223 {
1224         pthread_mutex_lock(&dp_mutex);
1225         while (!done && dp_entries == 0)
1226                 t_pthread_cond_wait(&dp_cond, &dp_mutex);
1227         pthread_mutex_unlock(&dp_mutex);
1228
1229         return !done;
1230 }
1231
1232 static int add_devpath(char *path)
1233 {
1234         int fd;
1235         struct devpath *dpp;
1236         struct list_head *p;
1237
1238         /*
1239          * Verify device is not duplicated
1240          */
1241         __list_for_each(p, &devpaths) {
1242                struct devpath *tmp = list_entry(p, struct devpath, head);
1243                if (!strcmp(tmp->path, path))
1244                         return 0;
1245         }
1246         /*
1247          * Verify device is valid before going too far
1248          */
1249         fd = my_open(path, O_RDONLY | O_NONBLOCK);
1250         if (fd < 0) {
1251                 fprintf(stderr, "Invalid path %s specified: %d/%s\n",
1252                         path, errno, strerror(errno));
1253                 return 1;
1254         }
1255
1256         dpp = malloc(sizeof(*dpp));
1257         memset(dpp, 0, sizeof(*dpp));
1258         dpp->path = strdup(path);
1259         dpp->fd = fd;
1260         ndevs++;
1261         list_add_tail(&dpp->head, &devpaths);
1262
1263         return 0;
1264 }
1265
1266 static void rel_devpaths(void)
1267 {
1268         struct list_head *p, *q;
1269
1270         list_for_each_safe(p, q, &devpaths) {
1271                 struct devpath *dpp = list_entry(p, struct devpath, head);
1272
1273                 list_del(&dpp->head);
1274                 __stop_trace(dpp->fd);
1275                 close(dpp->fd);
1276
1277                 if (dpp->heads)
1278                         free_tracer_heads(dpp);
1279
1280                 dpp_free(dpp);
1281                 ndevs--;
1282         }
1283 }
1284
1285 static int flush_subbuf_net(struct trace_buf *tbp)
1286 {
1287         int fd = cl_fds[tbp->cpu];
1288         struct devpath *dpp = tbp->dpp;
1289
1290         if (net_send_header(fd, tbp->cpu, dpp->buts_name, tbp->len))
1291                 return 1;
1292         else if (net_send_data(fd, tbp->buf, tbp->len) != tbp->len)
1293                 return 1;
1294
1295         return 0;
1296 }
1297
1298 static int
1299 handle_list_net(__attribute__((__unused__))struct tracer_devpath_head *hd,
1300                 struct list_head *list)
1301 {
1302         struct trace_buf *tbp;
1303         struct list_head *p, *q;
1304         int entries_handled = 0;
1305
1306         list_for_each_safe(p, q, list) {
1307                 tbp = list_entry(p, struct trace_buf, head);
1308
1309                 list_del(&tbp->head);
1310                 entries_handled++;
1311
1312                 if (cl_fds[tbp->cpu] >= 0) {
1313                         if (flush_subbuf_net(tbp)) {
1314                                 close(cl_fds[tbp->cpu]);
1315                                 cl_fds[tbp->cpu] = -1;
1316                         }
1317                 }
1318
1319                 free(tbp);
1320         }
1321
1322         return entries_handled;
1323 }
1324
1325 /*
1326  * Tack 'tbp's buf onto the tail of 'prev's buf
1327  */
1328 static struct trace_buf *tb_combine(struct trace_buf *prev,
1329                                     struct trace_buf *tbp)
1330 {
1331         unsigned long tot_len;
1332
1333         tot_len = prev->len + tbp->len;
1334         if (tot_len > buf_size) {
1335                 /*
1336                  * tbp->head isn't connected (it was 'prev'
1337                  * so it had been taken off of the list
1338                  * before). Therefore, we can realloc
1339                  * the whole structures, as the other fields
1340                  * are "static".
1341                  */
1342                 prev = realloc(prev, sizeof(*prev) + tot_len);
1343                 prev->buf = (void *)(prev + 1);
1344         }
1345
1346         memcpy(prev->buf + prev->len, tbp->buf, tbp->len);
1347         prev->len = tot_len;
1348
1349         free(tbp);
1350         return prev;
1351 }
1352
1353 static int handle_list_file(struct tracer_devpath_head *hd,
1354                             struct list_head *list)
1355 {
1356         int off, t_len, nevents;
1357         struct blk_io_trace *t;
1358         struct list_head *p, *q;
1359         int entries_handled = 0;
1360         struct trace_buf *tbp, *prev;
1361
1362         prev = hd->prev;
1363         list_for_each_safe(p, q, list) {
1364                 tbp = list_entry(p, struct trace_buf, head);
1365                 list_del(&tbp->head);
1366                 entries_handled++;
1367
1368                 /*
1369                  * If there was some leftover before, tack this new
1370                  * entry onto the tail of the previous one.
1371                  */
1372                 if (prev)
1373                         tbp = tb_combine(prev, tbp);
1374
1375                 /*
1376                  * See how many whole traces there are - send them
1377                  * all out in one go.
1378                  */
1379                 off = 0;
1380                 nevents = 0;
1381                 while (off + (int)sizeof(*t) <= tbp->len) {
1382                         t = (struct blk_io_trace *)(tbp->buf + off);
1383                         t_len = sizeof(*t) + t->pdu_len;
1384                         if (off + t_len > tbp->len)
1385                                 break;
1386
1387                         off += t_len;
1388                         nevents++;
1389                 }
1390                 if (nevents)
1391                         pdc_nev_update(tbp->dpp, tbp->cpu, nevents);
1392
1393                 /*
1394                  * Write any full set of traces, any remaining data is kept
1395                  * for the next pass.
1396                  */
1397                 if (off) {
1398                         if (write_data(tbp->buf, off) || off == tbp->len) {
1399                                 free(tbp);
1400                                 prev = NULL;
1401                         }
1402                         else {
1403                                 /*
1404                                  * Move valid data to beginning of buffer
1405                                  */
1406                                 tbp->len -= off;
1407                                 memmove(tbp->buf, tbp->buf + off, tbp->len);
1408                                 prev = tbp;
1409                         }
1410                 } else
1411                         prev = tbp;
1412         }
1413         hd->prev = prev;
1414
1415         return entries_handled;
1416 }
1417
1418 static void __process_trace_bufs(void)
1419 {
1420         int cpu;
1421         struct list_head *p;
1422         struct list_head list;
1423         int handled = 0;
1424
1425         __list_for_each(p, &devpaths) {
1426                 struct devpath *dpp = list_entry(p, struct devpath, head);
1427                 struct tracer_devpath_head *hd = dpp->heads;
1428
1429                 for (cpu = 0; cpu < ncpus; cpu++, hd++) {
1430                         pthread_mutex_lock(&hd->mutex);
1431                         if (list_empty(&hd->head)) {
1432                                 pthread_mutex_unlock(&hd->mutex);
1433                                 continue;
1434                         }
1435
1436                         list_replace_init(&hd->head, &list);
1437                         pthread_mutex_unlock(&hd->mutex);
1438
1439                         handled += handle_list(hd, &list);
1440                 }
1441         }
1442
1443         if (handled)
1444                 decr_entries(handled);
1445 }
1446
1447 static void process_trace_bufs(void)
1448 {
1449         while (wait_empty_entries())
1450                 __process_trace_bufs();
1451 }
1452
1453 static void clean_trace_bufs(void)
1454 {
1455         /*
1456          * No mutex needed here: we're only reading from the lists,
1457          * tracers are done
1458          */
1459         while (dp_entries)
1460                 __process_trace_bufs();
1461 }
1462
1463 static inline void read_err(int cpu, char *ifn)
1464 {
1465         if (errno != EAGAIN)
1466                 fprintf(stderr, "Thread %d failed read of %s: %d/%s\n",
1467                         cpu, ifn, errno, strerror(errno));
1468 }
1469
1470 static int net_sendfile(struct io_info *iop)
1471 {
1472         int ret;
1473
1474         ret = sendfile(iop->ofd, iop->ifd, NULL, iop->ready);
1475         if (ret < 0) {
1476                 perror("sendfile");
1477                 return 1;
1478         } else if (ret < (int)iop->ready) {
1479                 fprintf(stderr, "short sendfile send (%d of %d)\n",
1480                         ret, iop->ready);
1481                 return 1;
1482         }
1483
1484         return 0;
1485 }
1486
1487 static inline int net_sendfile_data(struct tracer *tp, struct io_info *iop)
1488 {
1489         struct devpath *dpp = iop->dpp;
1490
1491         if (net_send_header(iop->ofd, tp->cpu, dpp->buts_name, iop->ready))
1492                 return 1;
1493         return net_sendfile(iop);
1494 }
1495
1496 static int fill_ofname(struct io_info *iop, int cpu)
1497 {
1498         int len;
1499         struct stat sb;
1500         char *dst = iop->ofn;
1501
1502         if (output_dir)
1503                 len = snprintf(iop->ofn, sizeof(iop->ofn), "%s/", output_dir);
1504         else
1505                 len = snprintf(iop->ofn, sizeof(iop->ofn), "./");
1506
1507         if (net_mode == Net_server) {
1508                 struct cl_conn *nc = iop->nc;
1509
1510                 len += sprintf(dst + len, "%s-", nc->ch->hostname);
1511                 len += strftime(dst + len, 64, "%F-%T/",
1512                                 gmtime(&iop->dpp->cl_connect_time));
1513         }
1514
1515         if (stat(iop->ofn, &sb) < 0) {
1516                 if (errno != ENOENT) {
1517                         fprintf(stderr,
1518                                 "Destination dir %s stat failed: %d/%s\n",
1519                                 iop->ofn, errno, strerror(errno));
1520                         return 1;
1521                 }
1522                 /*
1523                  * There is no synchronization between multiple threads
1524                  * trying to create the directory at once.  It's harmless
1525                  * to let them try, so just detect the problem and move on.
1526                  */
1527                 if (mkdir(iop->ofn, 0755) < 0 && errno != EEXIST) {
1528                         fprintf(stderr,
1529                                 "Destination dir %s can't be made: %d/%s\n",
1530                                 iop->ofn, errno, strerror(errno));
1531                         return 1;
1532                 }
1533         }
1534
1535         if (output_name)
1536                 snprintf(iop->ofn + len, sizeof(iop->ofn), "%s.blktrace.%d",
1537                          output_name, cpu);
1538         else
1539                 snprintf(iop->ofn + len, sizeof(iop->ofn), "%s.blktrace.%d",
1540                          iop->dpp->buts_name, cpu);
1541
1542         return 0;
1543 }
1544
1545 static int set_vbuf(struct io_info *iop, int mode, size_t size)
1546 {
1547         iop->obuf = malloc(size);
1548         if (setvbuf(iop->ofp, iop->obuf, mode, size) < 0) {
1549                 fprintf(stderr, "setvbuf(%s, %d) failed: %d/%s\n",
1550                         iop->dpp->path, (int)size, errno,
1551                         strerror(errno));
1552                 free(iop->obuf);
1553                 return 1;
1554         }
1555
1556         return 0;
1557 }
1558
1559 static int iop_open(struct io_info *iop, int cpu)
1560 {
1561         iop->ofd = -1;
1562         if (fill_ofname(iop, cpu))
1563                 return 1;
1564
1565         iop->ofp = my_fopen(iop->ofn, "w+");
1566         if (iop->ofp == NULL) {
1567                 fprintf(stderr, "Open output file %s failed: %d/%s\n",
1568                         iop->ofn, errno, strerror(errno));
1569                 return 1;
1570         }
1571
1572         if (set_vbuf(iop, _IOLBF, FILE_VBUF_SIZE)) {
1573                 fprintf(stderr, "set_vbuf for file %s failed: %d/%s\n",
1574                         iop->ofn, errno, strerror(errno));
1575                 fclose(iop->ofp);
1576                 return 1;
1577         }
1578
1579         iop->ofd = fileno(iop->ofp);
1580         return 0;
1581 }
1582
1583 static void close_iop(struct io_info *iop)
1584 {
1585         struct mmap_info *mip = &iop->mmap_info;
1586
1587         if (mip->fs_buf)
1588                 munmap(mip->fs_buf, mip->fs_buf_len);
1589
1590         if (!piped_output) {
1591                 if (ftruncate(fileno(iop->ofp), mip->fs_size) < 0) {
1592                         fprintf(stderr,
1593                                 "Ignoring err: ftruncate(%s): %d/%s\n",
1594                                 iop->ofn, errno, strerror(errno));
1595                 }
1596         }
1597
1598         if (iop->ofp)
1599                 fclose(iop->ofp);
1600         if (iop->obuf)
1601                 free(iop->obuf);
1602 }
1603
1604 static void close_ios(struct tracer *tp)
1605 {
1606         while (tp->nios > 0) {
1607                 struct io_info *iop = &tp->ios[--tp->nios];
1608
1609                 iop->dpp->drops = get_drops(iop->dpp);
1610                 if (iop->ifd >= 0)
1611                         close(iop->ifd);
1612
1613                 if (iop->ofp)
1614                         close_iop(iop);
1615                 else if (iop->ofd >= 0) {
1616                         struct devpath *dpp = iop->dpp;
1617
1618                         net_send_close(iop->ofd, dpp->buts_name, dpp->drops);
1619                         net_close_connection(&iop->ofd);
1620                 }
1621         }
1622
1623         free(tp->ios);
1624         free(tp->pfds);
1625 }
1626
1627 static int open_ios(struct tracer *tp)
1628 {
1629         struct pollfd *pfd;
1630         struct io_info *iop;
1631         struct list_head *p;
1632
1633         tp->ios = calloc(ndevs, sizeof(struct io_info));
1634         memset(tp->ios, 0, ndevs * sizeof(struct io_info));
1635
1636         tp->pfds = calloc(ndevs, sizeof(struct pollfd));
1637         memset(tp->pfds, 0, ndevs * sizeof(struct pollfd));
1638
1639         tp->nios = 0;
1640         iop = tp->ios;
1641         pfd = tp->pfds;
1642         __list_for_each(p, &devpaths) {
1643                 struct devpath *dpp = list_entry(p, struct devpath, head);
1644
1645                 iop->dpp = dpp;
1646                 iop->ofd = -1;
1647                 snprintf(iop->ifn, sizeof(iop->ifn), "%s/block/%s/trace%d",
1648                         debugfs_path, dpp->buts_name, tp->cpu);
1649
1650                 iop->ifd = my_open(iop->ifn, O_RDONLY | O_NONBLOCK);
1651                 if (iop->ifd < 0) {
1652                         fprintf(stderr, "Thread %d failed open %s: %d/%s\n",
1653                                 tp->cpu, iop->ifn, errno, strerror(errno));
1654                         return 1;
1655                 }
1656
1657                 init_mmap_info(&iop->mmap_info);
1658
1659                 pfd->fd = iop->ifd;
1660                 pfd->events = POLLIN;
1661
1662                 if (piped_output)
1663                         ;
1664                 else if (net_client_use_sendfile()) {
1665                         iop->ofd = net_setup_client();
1666                         if (iop->ofd < 0)
1667                                 goto err;
1668                         net_send_open(iop->ofd, tp->cpu, dpp->buts_name);
1669                 } else if (net_mode == Net_none) {
1670                         if (iop_open(iop, tp->cpu))
1671                                 goto err;
1672                 } else {
1673                         /*
1674                          * This ensures that the server knows about all
1675                          * connections & devices before _any_ closes
1676                          */
1677                         net_send_open(cl_fds[tp->cpu], tp->cpu, dpp->buts_name);
1678                 }
1679
1680                 pfd++;
1681                 iop++;
1682                 tp->nios++;
1683         }
1684
1685         return 0;
1686
1687 err:
1688         close(iop->ifd);        /* tp->nios _not_ bumped */
1689         close_ios(tp);
1690         return 1;
1691 }
1692
1693 static int handle_pfds_file(struct tracer *tp, int nevs, int force_read)
1694 {
1695         struct mmap_info *mip;
1696         int i, ret, nentries = 0;
1697         struct pollfd *pfd = tp->pfds;
1698         struct io_info *iop = tp->ios;
1699
1700         for (i = 0; nevs > 0 && i < ndevs; i++, pfd++, iop++) {
1701                 if (pfd->revents & POLLIN || force_read) {
1702                         mip = &iop->mmap_info;
1703
1704                         ret = setup_mmap(iop->ofd, buf_size, mip, tp);
1705                         if (ret < 0) {
1706                                 pfd->events = 0;
1707                                 break;
1708                         }
1709
1710                         ret = read(iop->ifd, mip->fs_buf + mip->fs_off,
1711                                    buf_size);
1712                         if (ret > 0) {
1713                                 pdc_dr_update(iop->dpp, tp->cpu, ret);
1714                                 mip->fs_size += ret;
1715                                 mip->fs_off += ret;
1716                                 nentries++;
1717                         } else if (ret == 0) {
1718                                 /*
1719                                  * Short reads after we're done stop us
1720                                  * from trying reads.
1721                                  */
1722                                 if (tp->is_done)
1723                                         clear_events(pfd);
1724                         } else {
1725                                 read_err(tp->cpu, iop->ifn);
1726                                 if (errno != EAGAIN || tp->is_done)
1727                                         clear_events(pfd);
1728                         }
1729                         nevs--;
1730                 }
1731         }
1732
1733         return nentries;
1734 }
1735
1736 static int handle_pfds_netclient(struct tracer *tp, int nevs, int force_read)
1737 {
1738         struct stat sb;
1739         int i, nentries = 0;
1740         struct pollfd *pfd = tp->pfds;
1741         struct io_info *iop = tp->ios;
1742
1743         for (i = 0; i < ndevs; i++, pfd++, iop++) {
1744                 if (pfd->revents & POLLIN || force_read) {
1745                         if (fstat(iop->ifd, &sb) < 0) {
1746                                 perror(iop->ifn);
1747                                 pfd->events = 0;
1748                         } else if (sb.st_size > (off_t)iop->data_queued) {
1749                                 iop->ready = sb.st_size - iop->data_queued;
1750                                 iop->data_queued = sb.st_size;
1751
1752                                 if (!net_sendfile_data(tp, iop)) {
1753                                         pdc_dr_update(iop->dpp, tp->cpu,
1754                                                       iop->ready);
1755                                         nentries++;
1756                                 } else
1757                                         clear_events(pfd);
1758                         }
1759                         if (--nevs == 0)
1760                                 break;
1761                 }
1762         }
1763
1764         if (nentries)
1765                 incr_entries(nentries);
1766
1767         return nentries;
1768 }
1769
1770 static int handle_pfds_entries(struct tracer *tp, int nevs, int force_read)
1771 {
1772         int i, nentries = 0;
1773         struct trace_buf *tbp;
1774         struct pollfd *pfd = tp->pfds;
1775         struct io_info *iop = tp->ios;
1776
1777         tbp = alloc_trace_buf(tp->cpu, buf_size);
1778         for (i = 0; i < ndevs; i++, pfd++, iop++) {
1779                 if (pfd->revents & POLLIN || force_read) {
1780                         tbp->len = read(iop->ifd, tbp->buf, buf_size);
1781                         if (tbp->len > 0) {
1782                                 pdc_dr_update(iop->dpp, tp->cpu, tbp->len);
1783                                 add_trace_buf(iop->dpp, tp->cpu, &tbp);
1784                                 nentries++;
1785                         } else if (tbp->len == 0) {
1786                                 /*
1787                                  * Short reads after we're done stop us
1788                                  * from trying reads.
1789                                  */
1790                                 if (tp->is_done)
1791                                         clear_events(pfd);
1792                         } else {
1793                                 read_err(tp->cpu, iop->ifn);
1794                                 if (errno != EAGAIN || tp->is_done)
1795                                         clear_events(pfd);
1796                         }
1797                         if (!piped_output && --nevs == 0)
1798                                 break;
1799                 }
1800         }
1801         free(tbp);
1802
1803         if (nentries)
1804                 incr_entries(nentries);
1805
1806         return nentries;
1807 }
1808
1809 static void *thread_main(void *arg)
1810 {
1811         int ret, ndone, to_val;
1812         struct tracer *tp = arg;
1813
1814         ret = lock_on_cpu(tp->cpu);
1815         if (ret)
1816                 goto err;
1817
1818         ret = open_ios(tp);
1819         if (ret)
1820                 goto err;
1821
1822         if (piped_output)
1823                 to_val = 50;            /* Frequent partial handles */
1824         else
1825                 to_val = 500;           /* 1/2 second intervals */
1826
1827
1828         tracer_signal_ready(tp, Th_running, 0);
1829         tracer_wait_unblock(tp);
1830
1831         while (!tp->is_done) {
1832                 ndone = poll(tp->pfds, ndevs, to_val);
1833                 if (ndone || piped_output)
1834                         (void)handle_pfds(tp, ndone, piped_output);
1835                 else if (ndone < 0 && errno != EINTR)
1836                         fprintf(stderr, "Thread %d poll failed: %d/%s\n",
1837                                 tp->cpu, errno, strerror(errno));
1838         }
1839
1840         /*
1841          * Trace is stopped, pull data until we get a short read
1842          */
1843         while (handle_pfds(tp, ndevs, 1) > 0)
1844                 ;
1845
1846         close_ios(tp);
1847         tracer_signal_ready(tp, Th_leaving, 0);
1848         return NULL;
1849
1850 err:
1851         tracer_signal_ready(tp, Th_error, ret);
1852         return NULL;
1853 }
1854
1855 static int start_tracer(int cpu)
1856 {
1857         struct tracer *tp;
1858
1859         tp = malloc(sizeof(*tp));
1860         memset(tp, 0, sizeof(*tp));
1861
1862         INIT_LIST_HEAD(&tp->head);
1863         tp->status = 0;
1864         tp->cpu = cpu;
1865
1866         if (pthread_create(&tp->thread, NULL, thread_main, tp)) {
1867                 fprintf(stderr, "FAILED to start thread on CPU %d: %d/%s\n",
1868                         cpu, errno, strerror(errno));
1869                 free(tp);
1870                 return 1;
1871         }
1872
1873         list_add_tail(&tp->head, &tracers);
1874         return 0;
1875 }
1876
1877 static void start_tracers(void)
1878 {
1879         int cpu;
1880         struct list_head *p;
1881
1882         for (cpu = 0; cpu < ncpus; cpu++)
1883                 if (start_tracer(cpu))
1884                         break;
1885
1886         wait_tracers_ready(cpu);
1887
1888         __list_for_each(p, &tracers) {
1889                 struct tracer *tp = list_entry(p, struct tracer, head);
1890                 if (tp->status)
1891                         fprintf(stderr,
1892                                 "FAILED to start thread on CPU %d: %d/%s\n",
1893                                 tp->cpu, tp->status, strerror(tp->status));
1894         }
1895 }
1896
1897 static void stop_tracers(void)
1898 {
1899         struct list_head *p;
1900
1901         /*
1902          * Stop the tracing - makes the tracer threads clean up quicker.
1903          */
1904         __list_for_each(p, &devpaths) {
1905                 struct devpath *dpp = list_entry(p, struct devpath, head);
1906                 (void)ioctl(dpp->fd, BLKTRACESTOP);
1907         }
1908
1909         /*
1910          * Tell each tracer to quit
1911          */
1912         __list_for_each(p, &tracers) {
1913                 struct tracer *tp = list_entry(p, struct tracer, head);
1914                 tp->is_done = 1;
1915         }
1916         pthread_cond_broadcast(&mt_cond);
1917 }
1918
1919 static void del_tracers(void)
1920 {
1921         struct list_head *p, *q;
1922
1923         list_for_each_safe(p, q, &tracers) {
1924                 struct tracer *tp = list_entry(p, struct tracer, head);
1925
1926                 list_del(&tp->head);
1927                 free(tp);
1928         }
1929 }
1930
1931 static void wait_tracers(void)
1932 {
1933         struct list_head *p;
1934
1935         if (use_tracer_devpaths())
1936                 process_trace_bufs();
1937
1938         wait_tracers_leaving();
1939
1940         __list_for_each(p, &tracers) {
1941                 int ret;
1942                 struct tracer *tp = list_entry(p, struct tracer, head);
1943
1944                 ret = pthread_join(tp->thread, NULL);
1945                 if (ret)
1946                         fprintf(stderr, "Thread join %d failed %d\n",
1947                                 tp->cpu, ret);
1948         }
1949
1950         if (use_tracer_devpaths())
1951                 clean_trace_bufs();
1952
1953         get_all_drops();
1954 }
1955
1956 static void exit_tracing(void)
1957 {
1958         signal(SIGINT, SIG_IGN);
1959         signal(SIGHUP, SIG_IGN);
1960         signal(SIGTERM, SIG_IGN);
1961         signal(SIGALRM, SIG_IGN);
1962
1963         stop_tracers();
1964         wait_tracers();
1965         del_tracers();
1966         rel_devpaths();
1967 }
1968
1969 static void handle_sigint(__attribute__((__unused__)) int sig)
1970 {
1971         done = 1;
1972         stop_tracers();
1973 }
1974
1975 static void show_stats(struct list_head *devpaths)
1976 {
1977         FILE *ofp;
1978         struct list_head *p;
1979         unsigned long long nevents, data_read;
1980         unsigned long long total_drops = 0;
1981         unsigned long long total_events = 0;
1982
1983         if (piped_output)
1984                 ofp = my_fopen("/dev/null", "w");
1985         else
1986                 ofp = stdout;
1987
1988         __list_for_each(p, devpaths) {
1989                 int cpu;
1990                 struct pdc_stats *sp;
1991                 struct devpath *dpp = list_entry(p, struct devpath, head);
1992
1993                 if (net_mode == Net_server)
1994                         printf("server: end of run for %s:%s\n",
1995                                 dpp->ch->hostname, dpp->buts_name);
1996
1997                 data_read = 0;
1998                 nevents = 0;
1999
2000                 fprintf(ofp, "=== %s ===\n", dpp->buts_name);
2001                 for (cpu = 0, sp = dpp->stats; cpu < dpp->ncpus; cpu++, sp++) {
2002                         /*
2003                          * Estimate events if not known...
2004                          */
2005                         if (sp->nevents == 0) {
2006                                 sp->nevents = sp->data_read /
2007                                                 sizeof(struct blk_io_trace);
2008                         }
2009
2010                         fprintf(ofp,
2011                                 "  CPU%3d: %20llu events, %8llu KiB data\n",
2012                                 cpu, sp->nevents, (sp->data_read + 1023) >> 10);
2013
2014                         data_read += sp->data_read;
2015                         nevents += sp->nevents;
2016                 }
2017
2018                 fprintf(ofp, "  Total:  %20llu events (dropped %llu),"
2019                              " %8llu KiB data\n", nevents,
2020                              dpp->drops, (data_read + 1024) >> 10);
2021
2022                 total_drops += dpp->drops;
2023                 total_events += (nevents + dpp->drops);
2024         }
2025
2026         fflush(ofp);
2027         if (piped_output)
2028                 fclose(ofp);
2029
2030         if (total_drops) {
2031                 double drops_ratio = 1.0;
2032
2033                 if (total_events)
2034                         drops_ratio = (double)total_drops/(double)total_events;
2035
2036                 fprintf(stderr, "\nYou have %llu (%5.1lf%%) dropped events\n"
2037                                 "Consider using a larger buffer size (-b) "
2038                                 "and/or more buffers (-n)\n",
2039                         total_drops, 100.0 * drops_ratio);
2040         }
2041 }
2042
2043 static int handle_args(int argc, char *argv[])
2044 {
2045         int c, i;
2046         struct statfs st;
2047         int act_mask_tmp = 0;
2048
2049         while ((c = getopt_long(argc, argv, S_OPTS, l_opts, NULL)) >= 0) {
2050                 switch (c) {
2051                 case 'a':
2052                         i = find_mask_map(optarg);
2053                         if (i < 0) {
2054                                 fprintf(stderr, "Invalid action mask %s\n",
2055                                         optarg);
2056                                 return 1;
2057                         }
2058                         act_mask_tmp |= i;
2059                         break;
2060
2061                 case 'A':
2062                         if ((sscanf(optarg, "%x", &i) != 1) ||
2063                                                         !valid_act_opt(i)) {
2064                                 fprintf(stderr,
2065                                         "Invalid set action mask %s/0x%x\n",
2066                                         optarg, i);
2067                                 return 1;
2068                         }
2069                         act_mask_tmp = i;
2070                         break;
2071
2072                 case 'd':
2073                         if (add_devpath(optarg) != 0)
2074                                 return 1;
2075                         break;
2076
2077                 case 'I': {
2078                         char dev_line[256];
2079                         FILE *ifp = my_fopen(optarg, "r");
2080
2081                         if (!ifp) {
2082                                 fprintf(stderr,
2083                                         "Invalid file for devices %s\n",
2084                                         optarg);
2085                                 return 1;
2086                         }
2087
2088                         while (fscanf(ifp, "%s\n", dev_line) == 1) {
2089                                 if (add_devpath(dev_line) != 0) {
2090                                         fclose(ifp);
2091                                         return 1;
2092                                 }
2093                         }
2094                         fclose(ifp);
2095                         break;
2096                 }
2097
2098                 case 'r':
2099                         debugfs_path = optarg;
2100                         break;
2101
2102                 case 'o':
2103                         output_name = optarg;
2104                         break;
2105                 case 'k':
2106                         kill_running_trace = 1;
2107                         break;
2108                 case 'w':
2109                         stop_watch = atoi(optarg);
2110                         if (stop_watch <= 0) {
2111                                 fprintf(stderr,
2112                                         "Invalid stopwatch value (%d secs)\n",
2113                                         stop_watch);
2114                                 return 1;
2115                         }
2116                         break;
2117                 case 'V':
2118                 case 'v':
2119                         printf("%s version %s\n", argv[0], blktrace_version);
2120                         exit(0);
2121                         /*NOTREACHED*/
2122                 case 'b':
2123                         buf_size = strtoul(optarg, NULL, 10);
2124                         if (buf_size <= 0 || buf_size > 16*1024) {
2125                                 fprintf(stderr, "Invalid buffer size (%lu)\n",
2126                                         buf_size);
2127                                 return 1;
2128                         }
2129                         buf_size <<= 10;
2130                         break;
2131                 case 'n':
2132                         buf_nr = strtoul(optarg, NULL, 10);
2133                         if (buf_nr <= 0) {
2134                                 fprintf(stderr,
2135                                         "Invalid buffer nr (%lu)\n", buf_nr);
2136                                 return 1;
2137                         }
2138                         break;
2139                 case 'D':
2140                         output_dir = optarg;
2141                         break;
2142                 case 'h':
2143                         net_mode = Net_client;
2144                         memset(hostname, 0, sizeof(hostname));
2145                         strncpy(hostname, optarg, sizeof(hostname));
2146                         hostname[sizeof(hostname) - 1] = '\0';
2147                         break;
2148                 case 'l':
2149                         net_mode = Net_server;
2150                         break;
2151                 case 'p':
2152                         net_port = atoi(optarg);
2153                         break;
2154                 case 's':
2155                         net_use_sendfile = 0;
2156                         break;
2157                 default:
2158                         show_usage(argv[0]);
2159                         exit(1);
2160                         /*NOTREACHED*/
2161                 }
2162         }
2163
2164         while (optind < argc)
2165                 if (add_devpath(argv[optind++]) != 0)
2166                         return 1;
2167
2168         if (net_mode != Net_server && ndevs == 0) {
2169                 show_usage(argv[0]);
2170                 return 1;
2171         }
2172
2173         if (statfs(debugfs_path, &st) < 0) {
2174                 fprintf(stderr, "Invalid debug path %s: %d/%s\n",
2175                         debugfs_path, errno, strerror(errno));
2176                 return 1;
2177         }
2178
2179         if (st.f_type != (long)DEBUGFS_TYPE) {
2180                 fprintf(stderr, "Debugfs is not mounted at %s\n", debugfs_path);
2181                 return 1;
2182         }
2183
2184         if (act_mask_tmp != 0)
2185                 act_mask = act_mask_tmp;
2186
2187         if (net_mode == Net_client && net_setup_addr())
2188                 return 1;
2189
2190         /*
2191          * Set up for appropriate PFD handler based upon output name.
2192          */
2193         if (net_client_use_sendfile())
2194                 handle_pfds = handle_pfds_netclient;
2195         else if (net_client_use_send())
2196                 handle_pfds = handle_pfds_entries;
2197         else if (output_name && (strcmp(output_name, "-") == 0)) {
2198                 piped_output = 1;
2199                 handle_pfds = handle_pfds_entries;
2200                 pfp = stdout;
2201                 if (setvbuf(pfp, NULL, _IONBF, 0)) {
2202                         perror("setvbuf stdout");
2203                         return 1;
2204                 }
2205         } else
2206                 handle_pfds = handle_pfds_file;
2207         return 0;
2208 }
2209
2210 static void ch_add_connection(struct net_server_s *ns, struct cl_host *ch,
2211                               int fd)
2212 {
2213         struct cl_conn *nc;
2214
2215         nc = malloc(sizeof(*nc));
2216         memset(nc, 0, sizeof(*nc));
2217
2218         time(&nc->connect_time);
2219         nc->ch = ch;
2220         nc->fd = fd;
2221         nc->ncpus = -1;
2222
2223         list_add_tail(&nc->ch_head, &ch->conn_list);
2224         ch->connects++;
2225
2226         list_add_tail(&nc->ns_head, &ns->conn_list);
2227         ns->connects++;
2228         ns->pfds = realloc(ns->pfds, (ns->connects+1) * sizeof(struct pollfd));
2229 }
2230
2231 static void ch_rem_connection(struct net_server_s *ns, struct cl_host *ch,
2232                               struct cl_conn *nc)
2233 {
2234         net_close_connection(&nc->fd);
2235
2236         list_del(&nc->ch_head);
2237         ch->connects--;
2238
2239         list_del(&nc->ns_head);
2240         ns->connects--;
2241         ns->pfds = realloc(ns->pfds, (ns->connects+1) * sizeof(struct pollfd));
2242
2243         free(nc);
2244 }
2245
2246 static struct cl_host *net_find_client_host(struct net_server_s *ns,
2247                                             struct in_addr cl_in_addr)
2248 {
2249         struct list_head *p;
2250
2251         __list_for_each(p, &ns->ch_list) {
2252                 struct cl_host *ch = list_entry(p, struct cl_host, head);
2253
2254                 if (in_addr_eq(ch->cl_in_addr, cl_in_addr))
2255                         return ch;
2256         }
2257
2258         return NULL;
2259 }
2260
2261 static struct cl_host *net_add_client_host(struct net_server_s *ns,
2262                                            struct sockaddr_in *addr)
2263 {
2264         struct cl_host *ch;
2265
2266         ch = malloc(sizeof(*ch));
2267         memset(ch, 0, sizeof(*ch));
2268
2269         ch->ns = ns;
2270         ch->cl_in_addr = addr->sin_addr;
2271         list_add_tail(&ch->head, &ns->ch_list);
2272         ns->nchs++;
2273
2274         ch->hostname = strdup(inet_ntoa(addr->sin_addr));
2275         printf("server: connection from %s\n", ch->hostname);
2276
2277         INIT_LIST_HEAD(&ch->conn_list);
2278         INIT_LIST_HEAD(&ch->devpaths);
2279
2280         return ch;
2281 }
2282
2283 static void device_done(struct devpath *dpp, int ncpus)
2284 {
2285         int cpu;
2286         struct io_info *iop;
2287
2288         for (cpu = 0, iop = dpp->ios; cpu < ncpus; cpu++, iop++)
2289                 close_iop(iop);
2290
2291         list_del(&dpp->head);
2292         dpp_free(dpp);
2293 }
2294
2295 static void net_ch_remove(struct cl_host *ch, int ncpus)
2296 {
2297         struct list_head *p, *q;
2298         struct net_server_s *ns = ch->ns;
2299
2300         list_for_each_safe(p, q, &ch->devpaths) {
2301                 struct devpath *dpp = list_entry(p, struct devpath, head);
2302                 device_done(dpp, ncpus);
2303         }
2304
2305         list_for_each_safe(p, q, &ch->conn_list) {
2306                 struct cl_conn *nc = list_entry(p, struct cl_conn, ch_head);
2307
2308                 ch_rem_connection(ns, ch, nc);
2309         }
2310
2311         list_del(&ch->head);
2312         ns->nchs--;
2313
2314         if (ch->hostname)
2315                 free(ch->hostname);
2316         free(ch);
2317 }
2318
2319 static void net_add_connection(struct net_server_s *ns)
2320 {
2321         int fd;
2322         struct cl_host *ch;
2323         socklen_t socklen = sizeof(ns->addr);
2324
2325         fd = my_accept(ns->listen_fd, (struct sockaddr *)&ns->addr, &socklen);
2326         if (fd < 0) {
2327                 /*
2328                  * This is OK: we just won't accept this connection,
2329                  * nothing fatal.
2330                  */
2331                 perror("accept");
2332         } else {
2333                 ch = net_find_client_host(ns, ns->addr.sin_addr);
2334                 if (!ch)
2335                         ch = net_add_client_host(ns, &ns->addr);
2336
2337                 ch_add_connection(ns, ch, fd);
2338         }
2339 }
2340
2341 static struct devpath *nc_add_dpp(struct cl_conn *nc,
2342                                   struct blktrace_net_hdr *bnh,
2343                                   time_t connect_time)
2344 {
2345         int cpu;
2346         struct io_info *iop;
2347         struct devpath *dpp;
2348
2349         dpp = malloc(sizeof(*dpp));
2350         memset(dpp, 0, sizeof(*dpp));
2351
2352         dpp->buts_name = strdup(bnh->buts_name);
2353         dpp->path = strdup(bnh->buts_name);
2354         dpp->fd = -1;
2355         dpp->ch = nc->ch;
2356         dpp->cl_id = bnh->cl_id;
2357         dpp->cl_connect_time = connect_time;
2358         dpp->ncpus = nc->ncpus;
2359         dpp->stats = calloc(dpp->ncpus, sizeof(*dpp->stats));
2360         memset(dpp->stats, 0, dpp->ncpus * sizeof(*dpp->stats));
2361
2362         list_add_tail(&dpp->head, &nc->ch->devpaths);
2363         nc->ch->ndevs++;
2364
2365         dpp->ios = calloc(nc->ncpus, sizeof(*iop));
2366         memset(dpp->ios, 0, ndevs * sizeof(*iop));
2367
2368         for (cpu = 0, iop = dpp->ios; cpu < nc->ncpus; cpu++, iop++) {
2369                 iop->dpp = dpp;
2370                 iop->nc = nc;
2371                 init_mmap_info(&iop->mmap_info);
2372
2373                 if (iop_open(iop, cpu))
2374                         goto err;
2375         }
2376
2377         return dpp;
2378
2379 err:
2380         /*
2381          * Need to unravel what's been done...
2382          */
2383         while (cpu >= 0)
2384                 close_iop(&dpp->ios[cpu--]);
2385         dpp_free(dpp);
2386
2387         return NULL;
2388 }
2389
2390 static struct devpath *nc_find_dpp(struct cl_conn *nc,
2391                                    struct blktrace_net_hdr *bnh)
2392 {
2393         struct list_head *p;
2394         time_t connect_time = nc->connect_time;
2395
2396         __list_for_each(p, &nc->ch->devpaths) {
2397                 struct devpath *dpp = list_entry(p, struct devpath, head);
2398
2399                 if (!strcmp(dpp->buts_name, bnh->buts_name))
2400                         return dpp;
2401
2402                 if (dpp->cl_id == bnh->cl_id)
2403                         connect_time = dpp->cl_connect_time;
2404         }
2405
2406         return nc_add_dpp(nc, bnh, connect_time);
2407 }
2408
2409 static void net_client_read_data(struct cl_conn *nc, struct devpath *dpp,
2410                                  struct blktrace_net_hdr *bnh)
2411 {
2412         int ret;
2413         struct io_info *iop = &dpp->ios[bnh->cpu];
2414         struct mmap_info *mip = &iop->mmap_info;
2415
2416         if (setup_mmap(iop->ofd, bnh->len, &iop->mmap_info, NULL)) {
2417                 fprintf(stderr, "ncd(%s:%d): mmap failed\n",
2418                         nc->ch->hostname, nc->fd);
2419                 exit(1);
2420         }
2421
2422         ret = net_recv_data(nc->fd, mip->fs_buf + mip->fs_off, bnh->len);
2423         if (ret > 0) {
2424                 pdc_dr_update(dpp, bnh->cpu, ret);
2425                 mip->fs_size += ret;
2426                 mip->fs_off += ret;
2427         } else if (ret < 0)
2428                 exit(1);
2429 }
2430
2431 /*
2432  * Returns 1 if we closed a host - invalidates other polling information
2433  * that may be present.
2434  */
2435 static int net_client_data(struct cl_conn *nc)
2436 {
2437         int ret;
2438         struct devpath *dpp;
2439         struct blktrace_net_hdr bnh;
2440
2441         ret = net_get_header(nc, &bnh);
2442         if (ret == 0)
2443                 return 0;
2444
2445         if (ret < 0) {
2446                 fprintf(stderr, "ncd(%d): header read failed\n", nc->fd);
2447                 exit(1);
2448         }
2449
2450         if (data_is_native == -1 && check_data_endianness(bnh.magic)) {
2451                 fprintf(stderr, "ncd(%d): received data is bad\n", nc->fd);
2452                 exit(1);
2453         }
2454
2455         if (!data_is_native) {
2456                 bnh.magic = be32_to_cpu(bnh.magic);
2457                 bnh.cpu = be32_to_cpu(bnh.cpu);
2458                 bnh.max_cpus = be32_to_cpu(bnh.max_cpus);
2459                 bnh.len = be32_to_cpu(bnh.len);
2460                 bnh.cl_id = be32_to_cpu(bnh.cl_id);
2461                 bnh.buf_size = be32_to_cpu(bnh.buf_size);
2462                 bnh.buf_nr = be32_to_cpu(bnh.buf_nr);
2463                 bnh.page_size = be32_to_cpu(bnh.page_size);
2464         }
2465
2466         if ((bnh.magic & 0xffffff00) != BLK_IO_TRACE_MAGIC) {
2467                 fprintf(stderr, "ncd(%s:%d): bad data magic\n",
2468                         nc->ch->hostname, nc->fd);
2469                 exit(1);
2470         }
2471
2472         if (nc->ncpus == -1)
2473                 nc->ncpus = bnh.max_cpus;
2474
2475         /*
2476          * len == 0 means the other end is sending us a new connection/dpp
2477          * len == 1 means that the other end signalled end-of-run
2478          */
2479         dpp = nc_find_dpp(nc, &bnh);
2480         if (bnh.len == 0) {
2481                 /*
2482                  * Just adding in the dpp above is enough
2483                  */
2484                 ack_open_close(nc->fd, dpp->buts_name);
2485                 nc->ch->cl_opens++;
2486         } else if (bnh.len == 1) {
2487                 /*
2488                  * overload cpu count with dropped events
2489                  */
2490                 dpp->drops = bnh.cpu;
2491
2492                 ack_open_close(nc->fd, dpp->buts_name);
2493                 if (--nc->ch->cl_opens == 0) {
2494                         show_stats(&nc->ch->devpaths);
2495                         net_ch_remove(nc->ch, nc->ncpus);
2496                         return 1;
2497                 }
2498         } else
2499                 net_client_read_data(nc, dpp, &bnh);
2500
2501         return 0;
2502 }
2503
2504 static void handle_client_data(struct net_server_s *ns, int events)
2505 {
2506         struct cl_conn *nc;
2507         struct pollfd *pfd;
2508         struct list_head *p, *q;
2509
2510         pfd = &ns->pfds[1];
2511         list_for_each_safe(p, q, &ns->conn_list) {
2512                 if (pfd->revents & POLLIN) {
2513                         nc = list_entry(p, struct cl_conn, ns_head);
2514
2515                         if (net_client_data(nc) || --events == 0)
2516                                 break;
2517                 }
2518                 pfd++;
2519         }
2520 }
2521
2522 static void net_setup_pfds(struct net_server_s *ns)
2523 {
2524         struct pollfd *pfd;
2525         struct list_head *p;
2526
2527         ns->pfds[0].fd = ns->listen_fd;
2528         ns->pfds[0].events = POLLIN;
2529
2530         pfd = &ns->pfds[1];
2531         __list_for_each(p, &ns->conn_list) {
2532                 struct cl_conn *nc = list_entry(p, struct cl_conn, ns_head);
2533
2534                 pfd->fd = nc->fd;
2535                 pfd->events = POLLIN;
2536                 pfd++;
2537         }
2538 }
2539
2540 static int net_server_handle_connections(struct net_server_s *ns)
2541 {
2542         int events;
2543
2544         printf("server: waiting for connections...\n");
2545
2546         while (!done) {
2547                 net_setup_pfds(ns);
2548                 events = poll(ns->pfds, ns->connects + 1, -1);
2549                 if (events < 0) {
2550                         if (errno != EINTR) {
2551                                 perror("FATAL: poll error");
2552                                 return 1;
2553                         }
2554                 } else if (events > 0) {
2555                         if (ns->pfds[0].revents & POLLIN) {
2556                                 net_add_connection(ns);
2557                                 events--;
2558                         }
2559
2560                         if (events)
2561                                 handle_client_data(ns, events);
2562                 }
2563         }
2564
2565         return 0;
2566 }
2567
2568 static int net_server(void)
2569 {
2570         int fd, opt;
2571         int ret = 1;
2572         struct net_server_s net_server;
2573         struct net_server_s *ns = &net_server;
2574
2575         memset(ns, 0, sizeof(*ns));
2576         INIT_LIST_HEAD(&ns->ch_list);
2577         INIT_LIST_HEAD(&ns->conn_list);
2578         ns->pfds = malloc(sizeof(struct pollfd));
2579
2580         fd = my_socket(AF_INET, SOCK_STREAM, 0);
2581         if (fd < 0) {
2582                 perror("server: socket");
2583                 goto out;
2584         }
2585
2586         opt = 1;
2587         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
2588                 perror("setsockopt");
2589                 goto out;
2590         }
2591
2592         memset(&ns->addr, 0, sizeof(ns->addr));
2593         ns->addr.sin_family = AF_INET;
2594         ns->addr.sin_addr.s_addr = htonl(INADDR_ANY);
2595         ns->addr.sin_port = htons(net_port);
2596
2597         if (bind(fd, (struct sockaddr *) &ns->addr, sizeof(ns->addr)) < 0) {
2598                 perror("bind");
2599                 goto out;
2600         }
2601
2602         if (listen(fd, 1) < 0) {
2603                 perror("listen");
2604                 goto out;
2605         }
2606
2607         /*
2608          * The actual server looping is done here:
2609          */
2610         ns->listen_fd = fd;
2611         ret = net_server_handle_connections(ns);
2612
2613         /*
2614          * Clean up and return...
2615          */
2616 out:
2617         free(ns->pfds);
2618         return ret;
2619 }
2620
2621 static int run_tracers(void)
2622 {
2623         atexit(exit_tracing);
2624         if (net_mode == Net_client)
2625                 printf("blktrace: connecting to %s\n", hostname);
2626
2627         setup_buts();
2628
2629         if (use_tracer_devpaths()) {
2630                 if (setup_tracer_devpaths())
2631                         return 1;
2632
2633                 if (piped_output)
2634                         handle_list = handle_list_file;
2635                 else
2636                         handle_list = handle_list_net;
2637         }
2638
2639         start_tracers();
2640         if (nthreads_running == ncpus) {
2641                 unblock_tracers();
2642                 start_buts();
2643                 if (net_mode == Net_client)
2644                         printf("blktrace: connected!\n");
2645                 if (stop_watch)
2646                         alarm(stop_watch);
2647         } else
2648                 stop_tracers();
2649
2650         wait_tracers();
2651         if (nthreads_running == ncpus)
2652                 show_stats(&devpaths);
2653         if (net_client_use_send())
2654                 close_client_connections();
2655         del_tracers();
2656
2657         return 0;
2658 }
2659
2660 int main(int argc, char *argv[])
2661 {
2662         int ret = 0;
2663
2664         setlocale(LC_NUMERIC, "en_US");
2665         pagesize = getpagesize();
2666         ncpus = sysconf(_SC_NPROCESSORS_CONF);
2667         if (ncpus < 0) {
2668                 fprintf(stderr, "sysconf(_SC_NPROCESSORS_CONF) failed %d/%s\n",
2669                         errno, strerror(errno));
2670                 ret = 1;
2671                 goto out;
2672         } else if (handle_args(argc, argv)) {
2673                 ret = 1;
2674                 goto out;
2675         }
2676
2677         if (ndevs > 1 && output_name && strcmp(output_name, "-") != 0) {
2678                 fprintf(stderr, "-o not supported with multiple devices\n");
2679                 ret = 1;
2680                 goto out;
2681         }
2682
2683         signal(SIGINT, handle_sigint);
2684         signal(SIGHUP, handle_sigint);
2685         signal(SIGTERM, handle_sigint);
2686         signal(SIGALRM, handle_sigint);
2687         signal(SIGPIPE, SIG_IGN);
2688
2689         if (kill_running_trace) {
2690                 struct devpath *dpp;
2691                 struct list_head *p;
2692
2693                 __list_for_each(p, &devpaths) {
2694                         dpp = list_entry(p, struct devpath, head);
2695                         if (__stop_trace(dpp->fd)) {
2696                                 fprintf(stderr,
2697                                         "BLKTRACETEARDOWN %s failed: %d/%s\n",
2698                                         dpp->path, errno, strerror(errno));
2699                         }
2700                 }
2701         } else if (net_mode == Net_server) {
2702                 if (output_name) {
2703                         fprintf(stderr, "-o ignored in server mode\n");
2704                         output_name = NULL;
2705                 }
2706                 ret = net_server();
2707         } else
2708                 ret = run_tracers();
2709
2710 out:
2711         if (pfp)
2712                 fclose(pfp);
2713         rel_devpaths();
2714         return ret;
2715 }