blkparse: Avoid segfault for wrong cpu number.
[blktrace.git] / blktrace.c
1 /*
2  * block queue tracing application
3  *
4  * Copyright (C) 2005 Jens Axboe <axboe@suse.de>
5  * Copyright (C) 2006 Jens Axboe <axboe@kernel.dk>
6  *
7  * Rewrite to have a single thread per CPU (managing all devices on that CPU)
8  *      Alan D. Brunelle <alan.brunelle@hp.com> - January 2009
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *
20  *  You should have received a copy of the GNU General Public License
21  *  along with this program; if not, write to the Free Software
22  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23  *
24  */
25
26 #include <errno.h>
27 #include <stdarg.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <fcntl.h>
32 #include <getopt.h>
33 #include <sched.h>
34 #include <unistd.h>
35 #include <poll.h>
36 #include <signal.h>
37 #include <pthread.h>
38 #include <locale.h>
39 #include <sys/ioctl.h>
40 #include <sys/types.h>
41 #include <sys/stat.h>
42 #include <sys/vfs.h>
43 #include <sys/mman.h>
44 #include <sys/param.h>
45 #include <sys/time.h>
46 #include <sys/resource.h>
47 #include <sys/socket.h>
48 #include <netinet/in.h>
49 #include <arpa/inet.h>
50 #include <netdb.h>
51 #include <sys/sendfile.h>
52
53 #include "btt/list.h"
54 #include "blktrace.h"
55
56 /*
57  * You may want to increase this even more, if you are logging at a high
58  * rate and see skipped/missed events
59  */
60 #define BUF_SIZE                (512 * 1024)
61 #define BUF_NR                  (4)
62
63 #define FILE_VBUF_SIZE          (128 * 1024)
64
65 #define DEBUGFS_TYPE            (0x64626720)
66 #define TRACE_NET_PORT          (8462)
67
68 enum {
69         Net_none = 0,
70         Net_server,
71         Net_client,
72 };
73
74 enum thread_status {
75         Th_running,
76         Th_leaving,
77         Th_error
78 };
79
80 /*
81  * Generic stats collected: nevents can be _roughly_ estimated by data_read
82  * (discounting pdu...)
83  *
84  * These fields are updated w/ pdc_dr_update & pdc_nev_update below.
85  */
86 struct pdc_stats {
87         unsigned long long data_read;
88         unsigned long long nevents;
89 };
90
91 struct devpath {
92         struct list_head head;
93         char *path;                     /* path to device special file */
94         char *buts_name;                /* name returned from bt kernel code */
95         struct pdc_stats *stats;
96         int fd, ncpus;
97         unsigned long long drops;
98
99         /*
100          * For piped output only:
101          *
102          * Each tracer will have a tracer_devpath_head that it will add new
103          * data onto. It's list is protected above (tracer_devpath_head.mutex)
104          * and it will signal the processing thread using the dp_cond,
105          * dp_mutex & dp_entries variables above.
106          */
107         struct tracer_devpath_head *heads;
108
109         /*
110          * For network server mode only:
111          */
112         struct cl_host *ch;
113         u32 cl_id;
114         time_t cl_connect_time;
115         struct io_info *ios;
116 };
117
118 /*
119  * For piped output to stdout we will have each tracer thread (one per dev)
120  * tack buffers read from the relay queues on a per-device list.
121  *
122  * The main thread will then collect trace buffers from each of lists in turn.
123  *
124  * We will use a mutex to guard each of the trace_buf list. The tracers
125  * can then signal the main thread using <dp_cond,dp_mutex> and
126  * dp_entries. (When dp_entries is 0, and a tracer adds an entry it will
127  * signal. When dp_entries is 0, the main thread will wait for that condition
128  * to be signalled.)
129  *
130  * adb: It may be better just to have a large buffer per tracer per dev,
131  * and then use it as a ring-buffer. This would certainly cut down a lot
132  * of malloc/free thrashing, at the cost of more memory movements (potentially).
133  */
134 struct trace_buf {
135         struct list_head head;
136         struct devpath *dpp;
137         void *buf;
138         int cpu, len;
139 };
140
141 struct tracer_devpath_head {
142         pthread_mutex_t mutex;
143         struct list_head head;
144         struct trace_buf *prev;
145 };
146
147 /*
148  * Used to handle the mmap() interfaces for output file (containing traces)
149  */
150 struct mmap_info {
151         void *fs_buf;
152         unsigned long long fs_size, fs_max_size, fs_off, fs_buf_len;
153         unsigned long buf_size, buf_nr;
154         int pagesize;
155 };
156
157 /*
158  * Each thread doing work on a (client) side of blktrace will have one
159  * of these. The ios array contains input/output information, pfds holds
160  * poll() data. The volatile's provide flags to/from the main executing
161  * thread.
162  */
163 struct tracer {
164         struct list_head head;
165         struct io_info *ios;
166         struct pollfd *pfds;
167         pthread_t thread;
168         int cpu, nios;
169         volatile int status, is_done;
170 };
171
172 /*
173  * networking stuff follows. we include a magic number so we know whether
174  * to endianness convert or not.
175  *
176  * The len field is overloaded:
177  *      0 - Indicates an "open" - allowing the server to set up for a dev/cpu
178  *      1 - Indicates a "close" - Shut down connection orderly
179  *
180  * The cpu field is overloaded on close: it will contain the number of drops.
181  */
182 struct blktrace_net_hdr {
183         u32 magic;              /* same as trace magic */
184         char buts_name[32];     /* trace name */
185         u32 cpu;                /* for which cpu */
186         u32 max_cpus;
187         u32 len;                /* length of following trace data */
188         u32 cl_id;              /* id for set of client per-cpu connections */
189         u32 buf_size;           /* client buf_size for this trace  */
190         u32 buf_nr;             /* client buf_nr for this trace  */
191         u32 page_size;          /* client page_size for this trace  */
192 };
193
194 /*
195  * Each host encountered has one of these. The head is used to link this
196  * on to the network server's ch_list. Connections associated with this
197  * host are linked on conn_list, and any devices traced on that host
198  * are connected on the devpaths list.
199  */
200 struct cl_host {
201         struct list_head head;
202         struct list_head conn_list;
203         struct list_head devpaths;
204         struct net_server_s *ns;
205         char *hostname;
206         struct in_addr cl_in_addr;
207         int connects, ndevs, cl_opens;
208 };
209
210 /*
211  * Each connection (client to server socket ('fd')) has one of these. A
212  * back reference to the host ('ch'), and lists headers (for the host
213  * list, and the network server conn_list) are also included.
214  */
215 struct cl_conn {
216         struct list_head ch_head, ns_head;
217         struct cl_host *ch;
218         int fd, ncpus;
219         time_t connect_time;
220 };
221
222 /*
223  * The network server requires some poll structures to be maintained -
224  * one per conection currently on conn_list. The nchs/ch_list values
225  * are for each host connected to this server. The addr field is used
226  * for scratch as new connections are established.
227  */
228 struct net_server_s {
229         struct list_head conn_list;
230         struct list_head ch_list;
231         struct pollfd *pfds;
232         int listen_fd, connects, nchs;
233         struct sockaddr_in addr;
234 };
235
236 /*
237  * This structure is (generically) used to providide information
238  * for a read-to-write set of values.
239  *
240  * ifn & ifd represent input information
241  *
242  * ofn, ofd, ofp, obuf & mmap_info are used for output file (optionally).
243  */
244 struct io_info {
245         struct devpath *dpp;
246         FILE *ofp;
247         char *obuf;
248         struct cl_conn *nc;     /* Server network connection */
249
250         /*
251          * mmap controlled output files
252          */
253         struct mmap_info mmap_info;
254
255         /*
256          * Client network fields
257          */
258         unsigned int ready;
259         unsigned long long data_queued;
260
261         /*
262          * Input/output file descriptors & names
263          */
264         int ifd, ofd;
265         char ifn[MAXPATHLEN + 64];
266         char ofn[MAXPATHLEN + 64];
267 };
268
269 static char blktrace_version[] = "2.0.0";
270
271 /*
272  * Linkage to blktrace helper routines (trace conversions)
273  */
274 int data_is_native = -1;
275
276 static int ndevs;
277 static int ncpus;
278 static int pagesize;
279 static int act_mask = ~0U;
280 static int kill_running_trace;
281 static int stop_watch;
282 static int piped_output;
283
284 static char *debugfs_path = "/sys/kernel/debug";
285 static char *output_name;
286 static char *output_dir;
287
288 static unsigned long buf_size = BUF_SIZE;
289 static unsigned long buf_nr = BUF_NR;
290
291 static FILE *pfp;
292
293 static LIST_HEAD(devpaths);
294 static LIST_HEAD(tracers);
295
296 static volatile int done;
297
298 /*
299  * tracer threads add entries, the main thread takes them off and processes
300  * them. These protect the dp_entries variable.
301  */
302 static pthread_cond_t dp_cond = PTHREAD_COND_INITIALIZER;
303 static pthread_mutex_t dp_mutex = PTHREAD_MUTEX_INITIALIZER;
304 static volatile int dp_entries;
305
306 /*
307  * These synchronize master / thread interactions.
308  */
309 static pthread_cond_t mt_cond = PTHREAD_COND_INITIALIZER;
310 static pthread_mutex_t mt_mutex = PTHREAD_MUTEX_INITIALIZER;
311 static volatile int nthreads_running;
312 static volatile int nthreads_leaving;
313 static volatile int nthreads_error;
314 static volatile int tracers_run;
315
316 /*
317  * network cmd line params
318  */
319 static struct sockaddr_in hostname_addr;
320 static char hostname[MAXHOSTNAMELEN];
321 static int net_port = TRACE_NET_PORT;
322 static int net_use_sendfile = 1;
323 static int net_mode;
324 static int *cl_fds;
325
326 static int (*handle_pfds)(struct tracer *, int, int);
327 static int (*handle_list)(struct tracer_devpath_head *, struct list_head *);
328
329 #define S_OPTS  "d:a:A:r:o:kw:vVb:n:D:lh:p:sI:"
330 static struct option l_opts[] = {
331         {
332                 .name = "dev",
333                 .has_arg = required_argument,
334                 .flag = NULL,
335                 .val = 'd'
336         },
337         {
338                 .name = "input-devs",
339                 .has_arg = required_argument,
340                 .flag = NULL,
341                 .val = 'I'
342         },
343         {
344                 .name = "act-mask",
345                 .has_arg = required_argument,
346                 .flag = NULL,
347                 .val = 'a'
348         },
349         {
350                 .name = "set-mask",
351                 .has_arg = required_argument,
352                 .flag = NULL,
353                 .val = 'A'
354         },
355         {
356                 .name = "relay",
357                 .has_arg = required_argument,
358                 .flag = NULL,
359                 .val = 'r'
360         },
361         {
362                 .name = "output",
363                 .has_arg = required_argument,
364                 .flag = NULL,
365                 .val = 'o'
366         },
367         {
368                 .name = "kill",
369                 .has_arg = no_argument,
370                 .flag = NULL,
371                 .val = 'k'
372         },
373         {
374                 .name = "stopwatch",
375                 .has_arg = required_argument,
376                 .flag = NULL,
377                 .val = 'w'
378         },
379         {
380                 .name = "version",
381                 .has_arg = no_argument,
382                 .flag = NULL,
383                 .val = 'v'
384         },
385         {
386                 .name = "version",
387                 .has_arg = no_argument,
388                 .flag = NULL,
389                 .val = 'V'
390         },
391         {
392                 .name = "buffer-size",
393                 .has_arg = required_argument,
394                 .flag = NULL,
395                 .val = 'b'
396         },
397         {
398                 .name = "num-sub-buffers",
399                 .has_arg = required_argument,
400                 .flag = NULL,
401                 .val = 'n'
402         },
403         {
404                 .name = "output-dir",
405                 .has_arg = required_argument,
406                 .flag = NULL,
407                 .val = 'D'
408         },
409         {
410                 .name = "listen",
411                 .has_arg = no_argument,
412                 .flag = NULL,
413                 .val = 'l'
414         },
415         {
416                 .name = "host",
417                 .has_arg = required_argument,
418                 .flag = NULL,
419                 .val = 'h'
420         },
421         {
422                 .name = "port",
423                 .has_arg = required_argument,
424                 .flag = NULL,
425                 .val = 'p'
426         },
427         {
428                 .name = "no-sendfile",
429                 .has_arg = no_argument,
430                 .flag = NULL,
431                 .val = 's'
432         },
433         {
434                 .name = NULL,
435         }
436 };
437
438 static char usage_str[] = "\n\n" \
439         "-d <dev>             | --dev=<dev>\n" \
440         "[ -r <debugfs path>  | --relay=<debugfs path> ]\n" \
441         "[ -o <file>          | --output=<file>]\n" \
442         "[ -D <dir>           | --output-dir=<dir>\n" \
443         "[ -w <time>          | --stopwatch=<time>]\n" \
444         "[ -a <action field>  | --act-mask=<action field>]\n" \
445         "[ -A <action mask>   | --set-mask=<action mask>]\n" \
446         "[ -b <size>          | --buffer-size]\n" \
447         "[ -n <number>        | --num-sub-buffers=<number>]\n" \
448         "[ -l                 | --listen]\n" \
449         "[ -h <hostname>      | --host=<hostname>]\n" \
450         "[ -p <port number>   | --port=<port number>]\n" \
451         "[ -s                 | --no-sendfile]\n" \
452         "[ -I <devs file>     | --input-devs=<devs file>]\n" \
453         "[ -v <version>       | --version]\n" \
454         "[ -V <version>       | --version]\n" \
455
456         "\t-d Use specified device. May also be given last after options\n" \
457         "\t-r Path to mounted debugfs, defaults to /sys/kernel/debug\n" \
458         "\t-o File(s) to send output to\n" \
459         "\t-D Directory to prepend to output file names\n" \
460         "\t-w Stop after defined time, in seconds\n" \
461         "\t-a Only trace specified actions. See documentation\n" \
462         "\t-A Give trace mask as a single value. See documentation\n" \
463         "\t-b Sub buffer size in KiB (default 512)\n" \
464         "\t-n Number of sub buffers (default 4)\n" \
465         "\t-l Run in network listen mode (blktrace server)\n" \
466         "\t-h Run in network client mode, connecting to the given host\n" \
467         "\t-p Network port to use (default 8462)\n" \
468         "\t-s Make the network client NOT use sendfile() to transfer data\n" \
469         "\t-I Add devices found in <devs file>\n" \
470         "\t-v Print program version info\n" \
471         "\t-V Print program version info\n\n";
472
473 static void clear_events(struct pollfd *pfd)
474 {
475         pfd->events = 0;
476         pfd->revents = 0;
477 }
478
479 static inline int net_client_use_sendfile(void)
480 {
481         return net_mode == Net_client && net_use_sendfile;
482 }
483
484 static inline int net_client_use_send(void)
485 {
486         return net_mode == Net_client && !net_use_sendfile;
487 }
488
489 static inline int use_tracer_devpaths(void)
490 {
491         return piped_output || net_client_use_send();
492 }
493
494 static inline int in_addr_eq(struct in_addr a, struct in_addr b)
495 {
496         return a.s_addr == b.s_addr;
497 }
498
499 static inline void pdc_dr_update(struct devpath *dpp, int cpu, int data_read)
500 {
501         dpp->stats[cpu].data_read += data_read;
502 }
503
504 static inline void pdc_nev_update(struct devpath *dpp, int cpu, int nevents)
505 {
506         dpp->stats[cpu].nevents += nevents;
507 }
508
509 static void show_usage(char *prog)
510 {
511         fprintf(stderr, "Usage: %s %s", prog, usage_str);
512 }
513
514 /*
515  * Create a timespec 'msec' milliseconds into the future
516  */
517 static inline void make_timespec(struct timespec *tsp, long delta_msec)
518 {
519         struct timeval now;
520
521         gettimeofday(&now, NULL);
522         tsp->tv_sec = now.tv_sec;
523         tsp->tv_nsec = 1000L * now.tv_usec;
524
525         tsp->tv_nsec += (delta_msec * 1000000L);
526         if (tsp->tv_nsec > 1000000000L) {
527                 long secs = tsp->tv_nsec / 1000000000L;
528
529                 tsp->tv_sec += secs;
530                 tsp->tv_nsec -= (secs * 1000000000L);
531         }
532 }
533
534 /*
535  * Add a timer to ensure wait ends
536  */
537 static void t_pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
538 {
539         struct timespec ts;
540
541         make_timespec(&ts, 50);
542         pthread_cond_timedwait(cond, mutex, &ts);
543 }
544
545 static void unblock_tracers(void)
546 {
547         pthread_mutex_lock(&mt_mutex);
548         tracers_run = 1;
549         pthread_cond_broadcast(&mt_cond);
550         pthread_mutex_unlock(&mt_mutex);
551 }
552
553 static void tracer_wait_unblock(struct tracer *tp)
554 {
555         pthread_mutex_lock(&mt_mutex);
556         while (!tp->is_done && !tracers_run)
557                 pthread_cond_wait(&mt_cond, &mt_mutex);
558         pthread_mutex_unlock(&mt_mutex);
559 }
560
561 static void tracer_signal_ready(struct tracer *tp,
562                                 enum thread_status th_status,
563                                 int status)
564 {
565         pthread_mutex_lock(&mt_mutex);
566         tp->status = status;
567
568         if (th_status == Th_running)
569                 nthreads_running++;
570         else if (th_status == Th_error)
571                 nthreads_error++;
572         else
573                 nthreads_leaving++;
574
575         pthread_cond_signal(&mt_cond);
576         pthread_mutex_unlock(&mt_mutex);
577 }
578
579 static void wait_tracers_ready(int ncpus_started)
580 {
581         pthread_mutex_lock(&mt_mutex);
582         while ((nthreads_running + nthreads_error) < ncpus_started)
583                 t_pthread_cond_wait(&mt_cond, &mt_mutex);
584         pthread_mutex_unlock(&mt_mutex);
585 }
586
587 static void wait_tracers_leaving(void)
588 {
589         pthread_mutex_lock(&mt_mutex);
590         while (nthreads_leaving < nthreads_running)
591                 t_pthread_cond_wait(&mt_cond, &mt_mutex);
592         pthread_mutex_unlock(&mt_mutex);
593 }
594
595 static void init_mmap_info(struct mmap_info *mip)
596 {
597         mip->buf_size = buf_size;
598         mip->buf_nr = buf_nr;
599         mip->pagesize = pagesize;
600 }
601
602 static void net_close_connection(int *fd)
603 {
604         shutdown(*fd, SHUT_RDWR);
605         close(*fd);
606         *fd = -1;
607 }
608
609 static void dpp_free(struct devpath *dpp)
610 {
611         if (dpp->stats)
612                 free(dpp->stats);
613         if (dpp->ios)
614                 free(dpp->ios);
615         if (dpp->path)
616                 free(dpp->path);
617         if (dpp->buts_name)
618                 free(dpp->buts_name);
619         free(dpp);
620 }
621
622 static int lock_on_cpu(int cpu)
623 {
624         cpu_set_t cpu_mask;
625
626         CPU_ZERO(&cpu_mask);
627         CPU_SET(cpu, &cpu_mask);
628         if (sched_setaffinity(0, sizeof(cpu_mask), &cpu_mask) < 0)
629                 return errno;
630
631         return 0;
632 }
633
634 static int increase_limit(int resource, rlim_t increase)
635 {
636         struct rlimit rlim;
637         int save_errno = errno;
638
639         if (!getrlimit(resource, &rlim)) {
640                 rlim.rlim_cur += increase;
641                 if (rlim.rlim_cur >= rlim.rlim_max)
642                         rlim.rlim_max = rlim.rlim_cur + increase;
643
644                 if (!setrlimit(resource, &rlim))
645                         return 1;
646         }
647
648         errno = save_errno;
649         return 0;
650 }
651
652 static int handle_open_failure(void)
653 {
654         if (errno == ENFILE || errno == EMFILE)
655                 return increase_limit(RLIMIT_NOFILE, 16);
656         return 0;
657 }
658
659 static int handle_mem_failure(size_t length)
660 {
661         if (errno == ENFILE)
662                 return handle_open_failure();
663         else if (errno == ENOMEM)
664                 return increase_limit(RLIMIT_MEMLOCK, 2 * length);
665         return 0;
666 }
667
668 static FILE *my_fopen(const char *path, const char *mode)
669 {
670         FILE *fp;
671
672         do {
673                 fp = fopen(path, mode);
674         } while (fp == NULL && handle_open_failure());
675
676         return fp;
677 }
678
679 static int my_open(const char *path, int flags)
680 {
681         int fd;
682
683         do {
684                 fd = open(path, flags);
685         } while (fd < 0 && handle_open_failure());
686
687         return fd;
688 }
689
690 static int my_socket(int domain, int type, int protocol)
691 {
692         int fd;
693
694         do {
695                 fd = socket(domain, type, protocol);
696         } while (fd < 0 && handle_open_failure());
697
698         return fd;
699 }
700
701 static int my_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen)
702 {
703         int fd;
704
705         do {
706                 fd = accept(sockfd, addr, addrlen);
707         } while (fd < 0 && handle_open_failure());
708
709         return fd;
710 }
711
712 static void *my_mmap(void *addr, size_t length, int prot, int flags, int fd,
713                      off_t offset)
714 {
715         void *new;
716
717         do {
718                 new = mmap(addr, length, prot, flags, fd, offset);
719         } while (new == MAP_FAILED && handle_mem_failure(length));
720
721         return new;
722 }
723
724 static int my_mlock(struct tracer *tp,
725                     const void *addr, size_t len)
726 {
727         int ret, retry = 0;
728
729         do {
730                 ret = mlock(addr, len);
731                 if ((retry >= 10) && tp && tp->is_done)
732                         break;
733                 retry++;
734         } while (ret < 0 && handle_mem_failure(len));
735
736         return ret;
737 }
738
739 static int setup_mmap(int fd, unsigned int maxlen,
740                       struct mmap_info *mip,
741                       struct tracer *tp)
742 {
743         if (mip->fs_off + maxlen > mip->fs_buf_len) {
744                 unsigned long nr = max(16, mip->buf_nr);
745
746                 if (mip->fs_buf) {
747                         munlock(mip->fs_buf, mip->fs_buf_len);
748                         munmap(mip->fs_buf, mip->fs_buf_len);
749                         mip->fs_buf = NULL;
750                 }
751
752                 mip->fs_off = mip->fs_size & (mip->pagesize - 1);
753                 mip->fs_buf_len = (nr * mip->buf_size) - mip->fs_off;
754                 mip->fs_max_size += mip->fs_buf_len;
755
756                 if (ftruncate(fd, mip->fs_max_size) < 0) {
757                         perror("setup_mmap: ftruncate");
758                         return 1;
759                 }
760
761                 mip->fs_buf = my_mmap(NULL, mip->fs_buf_len, PROT_WRITE,
762                                       MAP_SHARED, fd,
763                                       mip->fs_size - mip->fs_off);
764                 if (mip->fs_buf == MAP_FAILED) {
765                         perror("setup_mmap: mmap");
766                         return 1;
767                 }
768                 if (my_mlock(tp, mip->fs_buf, mip->fs_buf_len) < 0) {
769                         perror("setup_mlock: mlock");
770                         return 1;
771                 }
772         }
773
774         return 0;
775 }
776
777 static int __stop_trace(int fd)
778 {
779         /*
780          * Should be stopped, don't complain if it isn't
781          */
782         ioctl(fd, BLKTRACESTOP);
783         return ioctl(fd, BLKTRACETEARDOWN);
784 }
785
786 static int write_data(char *buf, int len)
787 {
788         int ret;
789
790 rewrite:
791         ret = fwrite(buf, len, 1, pfp);
792         if (ferror(pfp) || ret != 1) {
793                 if (errno == EINTR) {
794                         clearerr(pfp);
795                         goto rewrite;
796                 }
797
798                 if (!piped_output || (errno != EPIPE && errno != EBADF)) {
799                         fprintf(stderr, "write(%d) failed: %d/%s\n",
800                                 len, errno, strerror(errno));
801                 }
802                 goto err;
803         }
804
805         fflush(pfp);
806         return 0;
807
808 err:
809         clearerr(pfp);
810         return 1;
811 }
812
813 /*
814  * Returns the number of bytes read (successfully)
815  */
816 static int __net_recv_data(int fd, void *buf, unsigned int len)
817 {
818         unsigned int bytes_left = len;
819
820         while (bytes_left && !done) {
821                 int ret = recv(fd, buf, bytes_left, MSG_WAITALL);
822
823                 if (ret == 0)
824                         break;
825                 else if (ret < 0) {
826                         if (errno == EAGAIN) {
827                                 usleep(50);
828                                 continue;
829                         }
830                         perror("server: net_recv_data: recv failed");
831                         break;
832                 } else {
833                         buf += ret;
834                         bytes_left -= ret;
835                 }
836         }
837
838         return len - bytes_left;
839 }
840
841 static int net_recv_data(int fd, void *buf, unsigned int len)
842 {
843         return __net_recv_data(fd, buf, len);
844 }
845
846 /*
847  * Returns number of bytes written
848  */
849 static int net_send_data(int fd, void *buf, unsigned int buf_len)
850 {
851         int ret;
852         unsigned int bytes_left = buf_len;
853
854         while (bytes_left) {
855                 ret = send(fd, buf, bytes_left, 0);
856                 if (ret < 0) {
857                         perror("send");
858                         break;
859                 }
860
861                 buf += ret;
862                 bytes_left -= ret;
863         }
864
865         return buf_len - bytes_left;
866 }
867
868 static int net_send_header(int fd, int cpu, char *buts_name, int len)
869 {
870         struct blktrace_net_hdr hdr;
871
872         memset(&hdr, 0, sizeof(hdr));
873
874         hdr.magic = BLK_IO_TRACE_MAGIC;
875         strncpy(hdr.buts_name, buts_name, sizeof(hdr.buts_name));
876         hdr.buts_name[sizeof(hdr.buts_name)-1] = '\0';
877         hdr.cpu = cpu;
878         hdr.max_cpus = ncpus;
879         hdr.len = len;
880         hdr.cl_id = getpid();
881         hdr.buf_size = buf_size;
882         hdr.buf_nr = buf_nr;
883         hdr.page_size = pagesize;
884
885         return net_send_data(fd, &hdr, sizeof(hdr)) != sizeof(hdr);
886 }
887
888 static void net_send_open_close(int fd, int cpu, char *buts_name, int len)
889 {
890         struct blktrace_net_hdr ret_hdr;
891
892         net_send_header(fd, cpu, buts_name, len);
893         net_recv_data(fd, &ret_hdr, sizeof(ret_hdr));
894 }
895
896 static void net_send_open(int fd, int cpu, char *buts_name)
897 {
898         net_send_open_close(fd, cpu, buts_name, 0);
899 }
900
901 static void net_send_close(int fd, char *buts_name, int drops)
902 {
903         /*
904          * Overload CPU w/ number of drops
905          *
906          * XXX: Need to clear/set done around call - done=1 (which
907          * is true here) stops reads from happening... :-(
908          */
909         done = 0;
910         net_send_open_close(fd, drops, buts_name, 1);
911         done = 1;
912 }
913
914 static void ack_open_close(int fd, char *buts_name)
915 {
916         net_send_header(fd, 0, buts_name, 2);
917 }
918
919 static void net_send_drops(int fd)
920 {
921         struct list_head *p;
922
923         __list_for_each(p, &devpaths) {
924                 struct devpath *dpp = list_entry(p, struct devpath, head);
925
926                 net_send_close(fd, dpp->buts_name, dpp->drops);
927         }
928 }
929
930 /*
931  * Returns:
932  *       0: "EOF"
933  *       1: OK
934  *      -1: Error
935  */
936 static int net_get_header(struct cl_conn *nc, struct blktrace_net_hdr *bnh)
937 {
938         int bytes_read;
939         int fl = fcntl(nc->fd, F_GETFL);
940
941         fcntl(nc->fd, F_SETFL, fl | O_NONBLOCK);
942         bytes_read = __net_recv_data(nc->fd, bnh, sizeof(*bnh));
943         fcntl(nc->fd, F_SETFL, fl & ~O_NONBLOCK);
944
945         if (bytes_read == sizeof(*bnh))
946                 return 1;
947         else if (bytes_read == 0)
948                 return 0;
949         else
950                 return -1;
951 }
952
953 static int net_setup_addr(void)
954 {
955         struct sockaddr_in *addr = &hostname_addr;
956
957         memset(addr, 0, sizeof(*addr));
958         addr->sin_family = AF_INET;
959         addr->sin_port = htons(net_port);
960
961         if (inet_aton(hostname, &addr->sin_addr) != 1) {
962                 struct hostent *hent;
963 retry:
964                 hent = gethostbyname(hostname);
965                 if (!hent) {
966                         if (h_errno == TRY_AGAIN) {
967                                 usleep(100);
968                                 goto retry;
969                         } else if (h_errno == NO_RECOVERY) {
970                                 fprintf(stderr, "gethostbyname(%s)"
971                                         "non-recoverable error encountered\n",
972                                         hostname);
973                         } else {
974                                 /*
975                                  * HOST_NOT_FOUND, NO_ADDRESS or NO_DATA
976                                  */
977                                 fprintf(stderr, "Host %s not found\n",
978                                         hostname);
979                         }
980                         return 1;
981                 }
982
983                 memcpy(&addr->sin_addr, hent->h_addr, 4);
984                 strcpy(hostname, hent->h_name);
985         }
986
987         return 0;
988 }
989
990 static int net_setup_client(void)
991 {
992         int fd;
993         struct sockaddr_in *addr = &hostname_addr;
994
995         fd = my_socket(AF_INET, SOCK_STREAM, 0);
996         if (fd < 0) {
997                 perror("client: socket");
998                 return -1;
999         }
1000
1001         if (connect(fd, (struct sockaddr *)addr, sizeof(*addr)) < 0) {
1002                 if (errno == ECONNREFUSED)
1003                         fprintf(stderr,
1004                                 "\nclient: Connection to %s refused, "
1005                                 "perhaps the server is not started?\n\n",
1006                                 hostname);
1007                 else
1008                         perror("client: connect");
1009
1010                 close(fd);
1011                 return -1;
1012         }
1013
1014         return fd;
1015 }
1016
1017 static int open_client_connections(void)
1018 {
1019         int cpu;
1020
1021         cl_fds = calloc(ncpus, sizeof(*cl_fds));
1022         for (cpu = 0; cpu < ncpus; cpu++) {
1023                 cl_fds[cpu] = net_setup_client();
1024                 if (cl_fds[cpu] < 0)
1025                         goto err;
1026         }
1027         return 0;
1028
1029 err:
1030         while (cpu > 0)
1031                 close(cl_fds[cpu--]);
1032         free(cl_fds);
1033         return 1;
1034 }
1035
1036 static void close_client_connections(void)
1037 {
1038         if (cl_fds) {
1039                 int cpu, *fdp;
1040
1041                 for (cpu = 0, fdp = cl_fds; cpu < ncpus; cpu++, fdp++) {
1042                         if (*fdp >= 0) {
1043                                 net_send_drops(*fdp);
1044                                 net_close_connection(fdp);
1045                         }
1046                 }
1047                 free(cl_fds);
1048         }
1049 }
1050
1051 static void setup_buts(void)
1052 {
1053         struct list_head *p;
1054
1055         __list_for_each(p, &devpaths) {
1056                 struct blk_user_trace_setup buts;
1057                 struct devpath *dpp = list_entry(p, struct devpath, head);
1058
1059                 memset(&buts, 0, sizeof(buts));
1060                 buts.buf_size = buf_size;
1061                 buts.buf_nr = buf_nr;
1062                 buts.act_mask = act_mask;
1063
1064                 if (ioctl(dpp->fd, BLKTRACESETUP, &buts) >= 0) {
1065                         dpp->ncpus = ncpus;
1066                         dpp->buts_name = strdup(buts.name);
1067                         if (dpp->stats)
1068                                 free(dpp->stats);
1069                         dpp->stats = calloc(dpp->ncpus, sizeof(*dpp->stats));
1070                         memset(dpp->stats, 0, dpp->ncpus * sizeof(*dpp->stats));
1071                 } else
1072                         fprintf(stderr, "BLKTRACESETUP(2) %s failed: %d/%s\n",
1073                                 dpp->path, errno, strerror(errno));
1074         }
1075 }
1076
1077 static void start_buts(void)
1078 {
1079         struct list_head *p;
1080
1081         __list_for_each(p, &devpaths) {
1082                 struct devpath *dpp = list_entry(p, struct devpath, head);
1083
1084                 if (ioctl(dpp->fd, BLKTRACESTART) < 0) {
1085                         fprintf(stderr, "BLKTRACESTART %s failed: %d/%s\n",
1086                                 dpp->path, errno, strerror(errno));
1087                 }
1088         }
1089 }
1090
1091 static int get_drops(struct devpath *dpp)
1092 {
1093         int fd, drops = 0;
1094         char fn[MAXPATHLEN + 64], tmp[256];
1095
1096         snprintf(fn, sizeof(fn), "%s/block/%s/dropped", debugfs_path,
1097                  dpp->buts_name);
1098
1099         fd = my_open(fn, O_RDONLY);
1100         if (fd < 0) {
1101                 /*
1102                  * This may be ok: the kernel may not support
1103                  * dropped counts.
1104                  */
1105                 if (errno != ENOENT)
1106                         fprintf(stderr, "Could not open %s: %d/%s\n",
1107                                 fn, errno, strerror(errno));
1108                 return 0;
1109         } else if (read(fd, tmp, sizeof(tmp)) < 0) {
1110                 fprintf(stderr, "Could not read %s: %d/%s\n",
1111                         fn, errno, strerror(errno));
1112         } else
1113                 drops = atoi(tmp);
1114         close(fd);
1115
1116         return drops;
1117 }
1118
1119 static void get_all_drops(void)
1120 {
1121         struct list_head *p;
1122
1123         __list_for_each(p, &devpaths) {
1124                 struct devpath *dpp = list_entry(p, struct devpath, head);
1125
1126                 dpp->drops = get_drops(dpp);
1127         }
1128 }
1129
1130 static inline struct trace_buf *alloc_trace_buf(int cpu, int bufsize)
1131 {
1132         struct trace_buf *tbp;
1133
1134         tbp = malloc(sizeof(*tbp) + bufsize);
1135         INIT_LIST_HEAD(&tbp->head);
1136         tbp->len = 0;
1137         tbp->buf = (void *)(tbp + 1);
1138         tbp->cpu = cpu;
1139         tbp->dpp = NULL;        /* Will be set when tbp is added */
1140
1141         return tbp;
1142 }
1143
1144 static void free_tracer_heads(struct devpath *dpp)
1145 {
1146         int cpu;
1147         struct tracer_devpath_head *hd;
1148
1149         for (cpu = 0, hd = dpp->heads; cpu < ncpus; cpu++, hd++) {
1150                 if (hd->prev)
1151                         free(hd->prev);
1152
1153                 pthread_mutex_destroy(&hd->mutex);
1154         }
1155         free(dpp->heads);
1156 }
1157
1158 static int setup_tracer_devpaths(void)
1159 {
1160         struct list_head *p;
1161
1162         if (net_client_use_send())
1163                 if (open_client_connections())
1164                         return 1;
1165
1166         __list_for_each(p, &devpaths) {
1167                 int cpu;
1168                 struct tracer_devpath_head *hd;
1169                 struct devpath *dpp = list_entry(p, struct devpath, head);
1170
1171                 dpp->heads = calloc(ncpus, sizeof(struct tracer_devpath_head));
1172                 for (cpu = 0, hd = dpp->heads; cpu < ncpus; cpu++, hd++) {
1173                         INIT_LIST_HEAD(&hd->head);
1174                         pthread_mutex_init(&hd->mutex, NULL);
1175                         hd->prev = NULL;
1176                 }
1177         }
1178
1179         return 0;
1180 }
1181
1182 static inline void add_trace_buf(struct devpath *dpp, int cpu,
1183                                                 struct trace_buf **tbpp)
1184 {
1185         struct trace_buf *tbp = *tbpp;
1186         struct tracer_devpath_head *hd = &dpp->heads[cpu];
1187
1188         tbp->dpp = dpp;
1189
1190         pthread_mutex_lock(&hd->mutex);
1191         list_add_tail(&tbp->head, &hd->head);
1192         pthread_mutex_unlock(&hd->mutex);
1193
1194         *tbpp = alloc_trace_buf(cpu, buf_size);
1195 }
1196
1197 static inline void incr_entries(int entries_handled)
1198 {
1199         pthread_mutex_lock(&dp_mutex);
1200         if (dp_entries == 0)
1201                 pthread_cond_signal(&dp_cond);
1202         dp_entries += entries_handled;
1203         pthread_mutex_unlock(&dp_mutex);
1204 }
1205
1206 static void decr_entries(int handled)
1207 {
1208         pthread_mutex_lock(&dp_mutex);
1209         dp_entries -= handled;
1210         pthread_mutex_unlock(&dp_mutex);
1211 }
1212
1213 static int wait_empty_entries(void)
1214 {
1215         pthread_mutex_lock(&dp_mutex);
1216         while (!done && dp_entries == 0)
1217                 t_pthread_cond_wait(&dp_cond, &dp_mutex);
1218         pthread_mutex_unlock(&dp_mutex);
1219
1220         return !done;
1221 }
1222
1223 static int add_devpath(char *path)
1224 {
1225         int fd;
1226         struct devpath *dpp;
1227         struct list_head *p;
1228
1229         /*
1230          * Verify device is not duplicated
1231          */
1232         __list_for_each(p, &devpaths) {
1233                struct devpath *tmp = list_entry(p, struct devpath, head);
1234                if (!strcmp(tmp->path, path))
1235                         return 0;
1236         }
1237         /*
1238          * Verify device is valid before going too far
1239          */
1240         fd = my_open(path, O_RDONLY | O_NONBLOCK);
1241         if (fd < 0) {
1242                 fprintf(stderr, "Invalid path %s specified: %d/%s\n",
1243                         path, errno, strerror(errno));
1244                 return 1;
1245         }
1246
1247         dpp = malloc(sizeof(*dpp));
1248         memset(dpp, 0, sizeof(*dpp));
1249         dpp->path = strdup(path);
1250         dpp->fd = fd;
1251         ndevs++;
1252         list_add_tail(&dpp->head, &devpaths);
1253
1254         return 0;
1255 }
1256
1257 static void rel_devpaths(void)
1258 {
1259         struct list_head *p, *q;
1260
1261         list_for_each_safe(p, q, &devpaths) {
1262                 struct devpath *dpp = list_entry(p, struct devpath, head);
1263
1264                 list_del(&dpp->head);
1265                 __stop_trace(dpp->fd);
1266                 close(dpp->fd);
1267
1268                 if (dpp->heads)
1269                         free_tracer_heads(dpp);
1270
1271                 dpp_free(dpp);
1272                 ndevs--;
1273         }
1274 }
1275
1276 static int flush_subbuf_net(struct trace_buf *tbp)
1277 {
1278         int fd = cl_fds[tbp->cpu];
1279         struct devpath *dpp = tbp->dpp;
1280
1281         if (net_send_header(fd, tbp->cpu, dpp->buts_name, tbp->len))
1282                 return 1;
1283         else if (net_send_data(fd, tbp->buf, tbp->len) != tbp->len)
1284                 return 1;
1285
1286         return 0;
1287 }
1288
1289 static int
1290 handle_list_net(__attribute__((__unused__))struct tracer_devpath_head *hd,
1291                 struct list_head *list)
1292 {
1293         struct trace_buf *tbp;
1294         struct list_head *p, *q;
1295         int entries_handled = 0;
1296
1297         list_for_each_safe(p, q, list) {
1298                 tbp = list_entry(p, struct trace_buf, head);
1299
1300                 list_del(&tbp->head);
1301                 entries_handled++;
1302
1303                 if (cl_fds[tbp->cpu] >= 0) {
1304                         if (flush_subbuf_net(tbp)) {
1305                                 close(cl_fds[tbp->cpu]);
1306                                 cl_fds[tbp->cpu] = -1;
1307                         }
1308                 }
1309
1310                 free(tbp);
1311         }
1312
1313         return entries_handled;
1314 }
1315
1316 /*
1317  * Tack 'tbp's buf onto the tail of 'prev's buf
1318  */
1319 static struct trace_buf *tb_combine(struct trace_buf *prev,
1320                                     struct trace_buf *tbp)
1321 {
1322         unsigned long tot_len;
1323
1324         tot_len = prev->len + tbp->len;
1325         if (tot_len > buf_size) {
1326                 /*
1327                  * tbp->head isn't connected (it was 'prev'
1328                  * so it had been taken off of the list
1329                  * before). Therefore, we can realloc
1330                  * the whole structures, as the other fields
1331                  * are "static".
1332                  */
1333                 prev = realloc(prev->buf, sizeof(*prev) + tot_len);
1334                 prev->buf = (void *)(prev + 1);
1335         }
1336
1337         memcpy(prev->buf + prev->len, tbp->buf, tbp->len);
1338         prev->len = tot_len;
1339
1340         free(tbp);
1341         return prev;
1342 }
1343
1344 static int handle_list_file(struct tracer_devpath_head *hd,
1345                             struct list_head *list)
1346 {
1347         int off, t_len, nevents;
1348         struct blk_io_trace *t;
1349         struct list_head *p, *q;
1350         int entries_handled = 0;
1351         struct trace_buf *tbp, *prev;
1352
1353         prev = hd->prev;
1354         list_for_each_safe(p, q, list) {
1355                 tbp = list_entry(p, struct trace_buf, head);
1356                 list_del(&tbp->head);
1357                 entries_handled++;
1358
1359                 /*
1360                  * If there was some leftover before, tack this new
1361                  * entry onto the tail of the previous one.
1362                  */
1363                 if (prev)
1364                         tbp = tb_combine(prev, tbp);
1365
1366                 /*
1367                  * See how many whole traces there are - send them
1368                  * all out in one go.
1369                  */
1370                 off = 0;
1371                 nevents = 0;
1372                 while (off + (int)sizeof(*t) <= tbp->len) {
1373                         t = (struct blk_io_trace *)(tbp->buf + off);
1374                         t_len = sizeof(*t) + t->pdu_len;
1375                         if (off + t_len > tbp->len)
1376                                 break;
1377
1378                         off += t_len;
1379                         nevents++;
1380                 }
1381                 if (nevents)
1382                         pdc_nev_update(tbp->dpp, tbp->cpu, nevents);
1383
1384                 /*
1385                  * Write any full set of traces, any remaining data is kept
1386                  * for the next pass.
1387                  */
1388                 if (off) {
1389                         if (write_data(tbp->buf, off) || off == tbp->len) {
1390                                 free(tbp);
1391                                 prev = NULL;
1392                         }
1393                         else {
1394                                 /*
1395                                  * Move valid data to beginning of buffer
1396                                  */
1397                                 tbp->len -= off;
1398                                 memmove(tbp->buf, tbp->buf + off, tbp->len);
1399                                 prev = tbp;
1400                         }
1401                 } else
1402                         prev = tbp;
1403         }
1404         hd->prev = prev;
1405
1406         return entries_handled;
1407 }
1408
1409 static void __process_trace_bufs(void)
1410 {
1411         int cpu;
1412         struct list_head *p;
1413         struct list_head list;
1414         int handled = 0;
1415
1416         __list_for_each(p, &devpaths) {
1417                 struct devpath *dpp = list_entry(p, struct devpath, head);
1418                 struct tracer_devpath_head *hd = dpp->heads;
1419
1420                 for (cpu = 0; cpu < ncpus; cpu++, hd++) {
1421                         pthread_mutex_lock(&hd->mutex);
1422                         if (list_empty(&hd->head)) {
1423                                 pthread_mutex_unlock(&hd->mutex);
1424                                 continue;
1425                         }
1426
1427                         list_replace_init(&hd->head, &list);
1428                         pthread_mutex_unlock(&hd->mutex);
1429
1430                         handled += handle_list(hd, &list);
1431                 }
1432         }
1433
1434         if (handled)
1435                 decr_entries(handled);
1436 }
1437
1438 static void process_trace_bufs(void)
1439 {
1440         while (wait_empty_entries())
1441                 __process_trace_bufs();
1442 }
1443
1444 static void clean_trace_bufs(void)
1445 {
1446         /*
1447          * No mutex needed here: we're only reading from the lists,
1448          * tracers are done
1449          */
1450         while (dp_entries)
1451                 __process_trace_bufs();
1452 }
1453
1454 static inline void read_err(int cpu, char *ifn)
1455 {
1456         if (errno != EAGAIN)
1457                 fprintf(stderr, "Thread %d failed read of %s: %d/%s\n",
1458                         cpu, ifn, errno, strerror(errno));
1459 }
1460
1461 static int net_sendfile(struct io_info *iop)
1462 {
1463         int ret;
1464
1465         ret = sendfile(iop->ofd, iop->ifd, NULL, iop->ready);
1466         if (ret < 0) {
1467                 perror("sendfile");
1468                 return 1;
1469         } else if (ret < (int)iop->ready) {
1470                 fprintf(stderr, "short sendfile send (%d of %d)\n",
1471                         ret, iop->ready);
1472                 return 1;
1473         }
1474
1475         return 0;
1476 }
1477
1478 static inline int net_sendfile_data(struct tracer *tp, struct io_info *iop)
1479 {
1480         struct devpath *dpp = iop->dpp;
1481
1482         if (net_send_header(iop->ofd, tp->cpu, dpp->buts_name, iop->ready))
1483                 return 1;
1484         return net_sendfile(iop);
1485 }
1486
1487 static int fill_ofname(struct io_info *iop, int cpu)
1488 {
1489         int len;
1490         struct stat sb;
1491         char *dst = iop->ofn;
1492
1493         if (output_dir)
1494                 len = snprintf(iop->ofn, sizeof(iop->ofn), "%s/", output_dir);
1495         else
1496                 len = snprintf(iop->ofn, sizeof(iop->ofn), "./");
1497
1498         if (net_mode == Net_server) {
1499                 struct cl_conn *nc = iop->nc;
1500
1501                 len += sprintf(dst + len, "%s-", nc->ch->hostname);
1502                 len += strftime(dst + len, 64, "%F-%T/",
1503                                 gmtime(&iop->dpp->cl_connect_time));
1504         }
1505
1506         if (stat(iop->ofn, &sb) < 0) {
1507                 if (errno != ENOENT) {
1508                         fprintf(stderr,
1509                                 "Destination dir %s stat failed: %d/%s\n",
1510                                 iop->ofn, errno, strerror(errno));
1511                         return 1;
1512                 }
1513                 /*
1514                  * There is no synchronization between multiple threads
1515                  * trying to create the directory at once.  It's harmless
1516                  * to let them try, so just detect the problem and move on.
1517                  */
1518                 if (mkdir(iop->ofn, 0755) < 0 && errno != EEXIST) {
1519                         fprintf(stderr,
1520                                 "Destination dir %s can't be made: %d/%s\n",
1521                                 iop->ofn, errno, strerror(errno));
1522                         return 1;
1523                 }
1524         }
1525
1526         if (output_name)
1527                 snprintf(iop->ofn + len, sizeof(iop->ofn), "%s.blktrace.%d",
1528                          output_name, cpu);
1529         else
1530                 snprintf(iop->ofn + len, sizeof(iop->ofn), "%s.blktrace.%d",
1531                          iop->dpp->buts_name, cpu);
1532
1533         return 0;
1534 }
1535
1536 static int set_vbuf(struct io_info *iop, int mode, size_t size)
1537 {
1538         iop->obuf = malloc(size);
1539         if (setvbuf(iop->ofp, iop->obuf, mode, size) < 0) {
1540                 fprintf(stderr, "setvbuf(%s, %d) failed: %d/%s\n",
1541                         iop->dpp->path, (int)size, errno,
1542                         strerror(errno));
1543                 free(iop->obuf);
1544                 return 1;
1545         }
1546
1547         return 0;
1548 }
1549
1550 static int iop_open(struct io_info *iop, int cpu)
1551 {
1552         iop->ofd = -1;
1553         if (fill_ofname(iop, cpu))
1554                 return 1;
1555
1556         iop->ofp = my_fopen(iop->ofn, "w+");
1557         if (iop->ofp == NULL) {
1558                 fprintf(stderr, "Open output file %s failed: %d/%s\n",
1559                         iop->ofn, errno, strerror(errno));
1560                 return 1;
1561         }
1562
1563         if (set_vbuf(iop, _IOLBF, FILE_VBUF_SIZE)) {
1564                 fprintf(stderr, "set_vbuf for file %s failed: %d/%s\n",
1565                         iop->ofn, errno, strerror(errno));
1566                 fclose(iop->ofp);
1567                 return 1;
1568         }
1569
1570         iop->ofd = fileno(iop->ofp);
1571         return 0;
1572 }
1573
1574 static void close_iop(struct io_info *iop)
1575 {
1576         struct mmap_info *mip = &iop->mmap_info;
1577
1578         if (mip->fs_buf)
1579                 munmap(mip->fs_buf, mip->fs_buf_len);
1580
1581         if (!piped_output) {
1582                 if (ftruncate(fileno(iop->ofp), mip->fs_size) < 0) {
1583                         fprintf(stderr,
1584                                 "Ignoring err: ftruncate(%s): %d/%s\n",
1585                                 iop->ofn, errno, strerror(errno));
1586                 }
1587         }
1588
1589         if (iop->ofp)
1590                 fclose(iop->ofp);
1591         if (iop->obuf)
1592                 free(iop->obuf);
1593 }
1594
1595 static void close_ios(struct tracer *tp)
1596 {
1597         while (tp->nios > 0) {
1598                 struct io_info *iop = &tp->ios[--tp->nios];
1599
1600                 iop->dpp->drops = get_drops(iop->dpp);
1601                 if (iop->ifd >= 0)
1602                         close(iop->ifd);
1603
1604                 if (iop->ofp)
1605                         close_iop(iop);
1606                 else if (iop->ofd >= 0) {
1607                         struct devpath *dpp = iop->dpp;
1608
1609                         net_send_close(iop->ofd, dpp->buts_name, dpp->drops);
1610                         net_close_connection(&iop->ofd);
1611                 }
1612         }
1613
1614         free(tp->ios);
1615         free(tp->pfds);
1616 }
1617
1618 static int open_ios(struct tracer *tp)
1619 {
1620         struct pollfd *pfd;
1621         struct io_info *iop;
1622         struct list_head *p;
1623
1624         tp->ios = calloc(ndevs, sizeof(struct io_info));
1625         memset(tp->ios, 0, ndevs * sizeof(struct io_info));
1626
1627         tp->pfds = calloc(ndevs, sizeof(struct pollfd));
1628         memset(tp->pfds, 0, ndevs * sizeof(struct pollfd));
1629
1630         tp->nios = 0;
1631         iop = tp->ios;
1632         pfd = tp->pfds;
1633         __list_for_each(p, &devpaths) {
1634                 struct devpath *dpp = list_entry(p, struct devpath, head);
1635
1636                 iop->dpp = dpp;
1637                 iop->ofd = -1;
1638                 snprintf(iop->ifn, sizeof(iop->ifn), "%s/block/%s/trace%d",
1639                         debugfs_path, dpp->buts_name, tp->cpu);
1640
1641                 iop->ifd = my_open(iop->ifn, O_RDONLY | O_NONBLOCK);
1642                 if (iop->ifd < 0) {
1643                         fprintf(stderr, "Thread %d failed open %s: %d/%s\n",
1644                                 tp->cpu, iop->ifn, errno, strerror(errno));
1645                         return 1;
1646                 }
1647
1648                 init_mmap_info(&iop->mmap_info);
1649
1650                 pfd->fd = iop->ifd;
1651                 pfd->events = POLLIN;
1652
1653                 if (piped_output)
1654                         ;
1655                 else if (net_client_use_sendfile()) {
1656                         iop->ofd = net_setup_client();
1657                         if (iop->ofd < 0)
1658                                 goto err;
1659                         net_send_open(iop->ofd, tp->cpu, dpp->buts_name);
1660                 } else if (net_mode == Net_none) {
1661                         if (iop_open(iop, tp->cpu))
1662                                 goto err;
1663                 } else {
1664                         /*
1665                          * This ensures that the server knows about all
1666                          * connections & devices before _any_ closes
1667                          */
1668                         net_send_open(cl_fds[tp->cpu], tp->cpu, dpp->buts_name);
1669                 }
1670
1671                 pfd++;
1672                 iop++;
1673                 tp->nios++;
1674         }
1675
1676         return 0;
1677
1678 err:
1679         close(iop->ifd);        /* tp->nios _not_ bumped */
1680         close_ios(tp);
1681         return 1;
1682 }
1683
1684 static int handle_pfds_file(struct tracer *tp, int nevs, int force_read)
1685 {
1686         struct mmap_info *mip;
1687         int i, ret, nentries = 0;
1688         struct pollfd *pfd = tp->pfds;
1689         struct io_info *iop = tp->ios;
1690
1691         for (i = 0; nevs > 0 && i < ndevs; i++, pfd++, iop++) {
1692                 if (pfd->revents & POLLIN || force_read) {
1693                         mip = &iop->mmap_info;
1694
1695                         ret = setup_mmap(iop->ofd, buf_size, mip, tp);
1696                         if (ret < 0) {
1697                                 pfd->events = 0;
1698                                 break;
1699                         }
1700
1701                         ret = read(iop->ifd, mip->fs_buf + mip->fs_off,
1702                                    buf_size);
1703                         if (ret > 0) {
1704                                 pdc_dr_update(iop->dpp, tp->cpu, ret);
1705                                 mip->fs_size += ret;
1706                                 mip->fs_off += ret;
1707                                 nentries++;
1708                         } else if (ret == 0) {
1709                                 /*
1710                                  * Short reads after we're done stop us
1711                                  * from trying reads.
1712                                  */
1713                                 if (tp->is_done)
1714                                         clear_events(pfd);
1715                         } else {
1716                                 read_err(tp->cpu, iop->ifn);
1717                                 if (errno != EAGAIN || tp->is_done)
1718                                         clear_events(pfd);
1719                         }
1720                         nevs--;
1721                 }
1722         }
1723
1724         return nentries;
1725 }
1726
1727 static int handle_pfds_netclient(struct tracer *tp, int nevs, int force_read)
1728 {
1729         struct stat sb;
1730         int i, nentries = 0;
1731         struct pdc_stats *sp;
1732         struct pollfd *pfd = tp->pfds;
1733         struct io_info *iop = tp->ios;
1734
1735         for (i = 0; i < ndevs; i++, pfd++, iop++, sp++) {
1736                 if (pfd->revents & POLLIN || force_read) {
1737                         if (fstat(iop->ifd, &sb) < 0) {
1738                                 perror(iop->ifn);
1739                                 pfd->events = 0;
1740                         } else if (sb.st_size > (off_t)iop->data_queued) {
1741                                 iop->ready = sb.st_size - iop->data_queued;
1742                                 iop->data_queued = sb.st_size;
1743
1744                                 if (!net_sendfile_data(tp, iop)) {
1745                                         pdc_dr_update(iop->dpp, tp->cpu,
1746                                                       iop->ready);
1747                                         nentries++;
1748                                 } else
1749                                         clear_events(pfd);
1750                         }
1751                         if (--nevs == 0)
1752                                 break;
1753                 }
1754         }
1755
1756         if (nentries)
1757                 incr_entries(nentries);
1758
1759         return nentries;
1760 }
1761
1762 static int handle_pfds_entries(struct tracer *tp, int nevs, int force_read)
1763 {
1764         int i, nentries = 0;
1765         struct trace_buf *tbp;
1766         struct pollfd *pfd = tp->pfds;
1767         struct io_info *iop = tp->ios;
1768
1769         tbp = alloc_trace_buf(tp->cpu, buf_size);
1770         for (i = 0; i < ndevs; i++, pfd++, iop++) {
1771                 if (pfd->revents & POLLIN || force_read) {
1772                         tbp->len = read(iop->ifd, tbp->buf, buf_size);
1773                         if (tbp->len > 0) {
1774                                 pdc_dr_update(iop->dpp, tp->cpu, tbp->len);
1775                                 add_trace_buf(iop->dpp, tp->cpu, &tbp);
1776                                 nentries++;
1777                         } else if (tbp->len == 0) {
1778                                 /*
1779                                  * Short reads after we're done stop us
1780                                  * from trying reads.
1781                                  */
1782                                 if (tp->is_done)
1783                                         clear_events(pfd);
1784                         } else {
1785                                 read_err(tp->cpu, iop->ifn);
1786                                 if (errno != EAGAIN || tp->is_done)
1787                                         clear_events(pfd);
1788                         }
1789                         if (!piped_output && --nevs == 0)
1790                                 break;
1791                 }
1792         }
1793         free(tbp);
1794
1795         if (nentries)
1796                 incr_entries(nentries);
1797
1798         return nentries;
1799 }
1800
1801 static void *thread_main(void *arg)
1802 {
1803         int ret, ndone, to_val;
1804         struct tracer *tp = arg;
1805
1806         ret = lock_on_cpu(tp->cpu);
1807         if (ret)
1808                 goto err;
1809
1810         ret = open_ios(tp);
1811         if (ret)
1812                 goto err;
1813
1814         if (piped_output)
1815                 to_val = 50;            /* Frequent partial handles */
1816         else
1817                 to_val = 500;           /* 1/2 second intervals */
1818
1819
1820         tracer_signal_ready(tp, Th_running, 0);
1821         tracer_wait_unblock(tp);
1822
1823         while (!tp->is_done) {
1824                 ndone = poll(tp->pfds, ndevs, to_val);
1825                 if (ndone || piped_output)
1826                         (void)handle_pfds(tp, ndone, piped_output);
1827                 else if (ndone < 0 && errno != EINTR)
1828                         fprintf(stderr, "Thread %d poll failed: %d/%s\n",
1829                                 tp->cpu, errno, strerror(errno));
1830         }
1831
1832         /*
1833          * Trace is stopped, pull data until we get a short read
1834          */
1835         while (handle_pfds(tp, ndevs, 1) > 0)
1836                 ;
1837
1838         close_ios(tp);
1839         tracer_signal_ready(tp, Th_leaving, 0);
1840         return NULL;
1841
1842 err:
1843         tracer_signal_ready(tp, Th_error, ret);
1844         return NULL;
1845 }
1846
1847 static int start_tracer(int cpu)
1848 {
1849         struct tracer *tp;
1850
1851         tp = malloc(sizeof(*tp));
1852         memset(tp, 0, sizeof(*tp));
1853
1854         INIT_LIST_HEAD(&tp->head);
1855         tp->status = 0;
1856         tp->cpu = cpu;
1857
1858         if (pthread_create(&tp->thread, NULL, thread_main, tp)) {
1859                 fprintf(stderr, "FAILED to start thread on CPU %d: %d/%s\n",
1860                         cpu, errno, strerror(errno));
1861                 free(tp);
1862                 return 1;
1863         }
1864
1865         list_add_tail(&tp->head, &tracers);
1866         return 0;
1867 }
1868
1869 static void start_tracers(void)
1870 {
1871         int cpu;
1872         struct list_head *p;
1873
1874         for (cpu = 0; cpu < ncpus; cpu++)
1875                 if (start_tracer(cpu))
1876                         break;
1877
1878         wait_tracers_ready(cpu);
1879
1880         __list_for_each(p, &tracers) {
1881                 struct tracer *tp = list_entry(p, struct tracer, head);
1882                 if (tp->status)
1883                         fprintf(stderr,
1884                                 "FAILED to start thread on CPU %d: %d/%s\n",
1885                                 tp->cpu, tp->status, strerror(tp->status));
1886         }
1887 }
1888
1889 static void stop_tracers(void)
1890 {
1891         struct list_head *p;
1892
1893         /*
1894          * Stop the tracing - makes the tracer threads clean up quicker.
1895          */
1896         __list_for_each(p, &devpaths) {
1897                 struct devpath *dpp = list_entry(p, struct devpath, head);
1898                 (void)ioctl(dpp->fd, BLKTRACESTOP);
1899         }
1900
1901         /*
1902          * Tell each tracer to quit
1903          */
1904         __list_for_each(p, &tracers) {
1905                 struct tracer *tp = list_entry(p, struct tracer, head);
1906                 tp->is_done = 1;
1907         }
1908 }
1909
1910 static void del_tracers(void)
1911 {
1912         struct list_head *p, *q;
1913
1914         list_for_each_safe(p, q, &tracers) {
1915                 struct tracer *tp = list_entry(p, struct tracer, head);
1916
1917                 list_del(&tp->head);
1918                 free(tp);
1919         }
1920 }
1921
1922 static void wait_tracers(void)
1923 {
1924         struct list_head *p;
1925
1926         if (use_tracer_devpaths())
1927                 process_trace_bufs();
1928
1929         wait_tracers_leaving();
1930
1931         __list_for_each(p, &tracers) {
1932                 int ret;
1933                 struct tracer *tp = list_entry(p, struct tracer, head);
1934
1935                 ret = pthread_join(tp->thread, NULL);
1936                 if (ret)
1937                         fprintf(stderr, "Thread join %d failed %d\n",
1938                                 tp->cpu, ret);
1939         }
1940
1941         if (use_tracer_devpaths())
1942                 clean_trace_bufs();
1943
1944         get_all_drops();
1945 }
1946
1947 static void exit_tracing(void)
1948 {
1949         signal(SIGINT, SIG_IGN);
1950         signal(SIGHUP, SIG_IGN);
1951         signal(SIGTERM, SIG_IGN);
1952         signal(SIGALRM, SIG_IGN);
1953
1954         stop_tracers();
1955         wait_tracers();
1956         del_tracers();
1957         rel_devpaths();
1958 }
1959
1960 static void handle_sigint(__attribute__((__unused__)) int sig)
1961 {
1962         done = 1;
1963         stop_tracers();
1964 }
1965
1966 static void show_stats(struct list_head *devpaths)
1967 {
1968         FILE *ofp;
1969         struct list_head *p;
1970         unsigned long long nevents, data_read;
1971         unsigned long long total_drops = 0;
1972         unsigned long long total_events = 0;
1973
1974         if (piped_output)
1975                 ofp = my_fopen("/dev/null", "w");
1976         else
1977                 ofp = stdout;
1978
1979         __list_for_each(p, devpaths) {
1980                 int cpu;
1981                 struct pdc_stats *sp;
1982                 struct devpath *dpp = list_entry(p, struct devpath, head);
1983
1984                 if (net_mode == Net_server)
1985                         printf("server: end of run for %s:%s\n",
1986                                 dpp->ch->hostname, dpp->buts_name);
1987
1988                 data_read = 0;
1989                 nevents = 0;
1990
1991                 fprintf(ofp, "=== %s ===\n", dpp->buts_name);
1992                 for (cpu = 0, sp = dpp->stats; cpu < dpp->ncpus; cpu++, sp++) {
1993                         /*
1994                          * Estimate events if not known...
1995                          */
1996                         if (sp->nevents == 0) {
1997                                 sp->nevents = sp->data_read /
1998                                                 sizeof(struct blk_io_trace);
1999                         }
2000
2001                         fprintf(ofp,
2002                                 "  CPU%3d: %20llu events, %8llu KiB data\n",
2003                                 cpu, sp->nevents, (sp->data_read + 1023) >> 10);
2004
2005                         data_read += sp->data_read;
2006                         nevents += sp->nevents;
2007                 }
2008
2009                 fprintf(ofp, "  Total:  %20llu events (dropped %llu),"
2010                              " %8llu KiB data\n", nevents,
2011                              dpp->drops, (data_read + 1024) >> 10);
2012
2013                 total_drops += dpp->drops;
2014                 total_events += (nevents + dpp->drops);
2015         }
2016
2017         fflush(ofp);
2018         if (piped_output)
2019                 fclose(ofp);
2020
2021         if (total_drops) {
2022                 double drops_ratio = 1.0;
2023
2024                 if (total_events)
2025                         drops_ratio = (double)total_drops/(double)total_events;
2026
2027                 fprintf(stderr, "\nYou have %llu (%5.1lf%%) dropped events\n"
2028                                 "Consider using a larger buffer size (-b) "
2029                                 "and/or more buffers (-n)\n",
2030                         total_drops, 100.0 * drops_ratio);
2031         }
2032 }
2033
2034 static int handle_args(int argc, char *argv[])
2035 {
2036         int c, i;
2037         struct statfs st;
2038         int act_mask_tmp = 0;
2039
2040         while ((c = getopt_long(argc, argv, S_OPTS, l_opts, NULL)) >= 0) {
2041                 switch (c) {
2042                 case 'a':
2043                         i = find_mask_map(optarg);
2044                         if (i < 0) {
2045                                 fprintf(stderr, "Invalid action mask %s\n",
2046                                         optarg);
2047                                 return 1;
2048                         }
2049                         act_mask_tmp |= i;
2050                         break;
2051
2052                 case 'A':
2053                         if ((sscanf(optarg, "%x", &i) != 1) ||
2054                                                         !valid_act_opt(i)) {
2055                                 fprintf(stderr,
2056                                         "Invalid set action mask %s/0x%x\n",
2057                                         optarg, i);
2058                                 return 1;
2059                         }
2060                         act_mask_tmp = i;
2061                         break;
2062
2063                 case 'd':
2064                         if (add_devpath(optarg) != 0)
2065                                 return 1;
2066                         break;
2067
2068                 case 'I': {
2069                         char dev_line[256];
2070                         FILE *ifp = my_fopen(optarg, "r");
2071
2072                         if (!ifp) {
2073                                 fprintf(stderr,
2074                                         "Invalid file for devices %s\n",
2075                                         optarg);
2076                                 return 1;
2077                         }
2078
2079                         while (fscanf(ifp, "%s\n", dev_line) == 1)
2080                                 if (add_devpath(dev_line) != 0)
2081                                         return 1;
2082                         break;
2083                 }
2084
2085                 case 'r':
2086                         debugfs_path = optarg;
2087                         break;
2088
2089                 case 'o':
2090                         output_name = optarg;
2091                         break;
2092                 case 'k':
2093                         kill_running_trace = 1;
2094                         break;
2095                 case 'w':
2096                         stop_watch = atoi(optarg);
2097                         if (stop_watch <= 0) {
2098                                 fprintf(stderr,
2099                                         "Invalid stopwatch value (%d secs)\n",
2100                                         stop_watch);
2101                                 return 1;
2102                         }
2103                         break;
2104                 case 'V':
2105                 case 'v':
2106                         printf("%s version %s\n", argv[0], blktrace_version);
2107                         exit(0);
2108                         /*NOTREACHED*/
2109                 case 'b':
2110                         buf_size = strtoul(optarg, NULL, 10);
2111                         if (buf_size <= 0 || buf_size > 16*1024) {
2112                                 fprintf(stderr, "Invalid buffer size (%lu)\n",
2113                                         buf_size);
2114                                 return 1;
2115                         }
2116                         buf_size <<= 10;
2117                         break;
2118                 case 'n':
2119                         buf_nr = strtoul(optarg, NULL, 10);
2120                         if (buf_nr <= 0) {
2121                                 fprintf(stderr,
2122                                         "Invalid buffer nr (%lu)\n", buf_nr);
2123                                 return 1;
2124                         }
2125                         break;
2126                 case 'D':
2127                         output_dir = optarg;
2128                         break;
2129                 case 'h':
2130                         net_mode = Net_client;
2131                         strcpy(hostname, optarg);
2132                         break;
2133                 case 'l':
2134                         net_mode = Net_server;
2135                         break;
2136                 case 'p':
2137                         net_port = atoi(optarg);
2138                         break;
2139                 case 's':
2140                         net_use_sendfile = 0;
2141                         break;
2142                 default:
2143                         show_usage(argv[0]);
2144                         exit(1);
2145                         /*NOTREACHED*/
2146                 }
2147         }
2148
2149         while (optind < argc)
2150                 if (add_devpath(argv[optind++]) != 0)
2151                         return 1;
2152
2153         if (net_mode != Net_server && ndevs == 0) {
2154                 show_usage(argv[0]);
2155                 return 1;
2156         }
2157
2158         if (statfs(debugfs_path, &st) < 0 || st.f_type != (long)DEBUGFS_TYPE) {
2159                 fprintf(stderr, "Invalid debug path %s: %d/%s\n",
2160                         debugfs_path, errno, strerror(errno));
2161                 return 1;
2162         }
2163
2164         if (act_mask_tmp != 0)
2165                 act_mask = act_mask_tmp;
2166
2167         if (net_mode == Net_client && net_setup_addr())
2168                 return 1;
2169
2170         /*
2171          * Set up for appropriate PFD handler based upon output name.
2172          */
2173         if (net_client_use_sendfile())
2174                 handle_pfds = handle_pfds_netclient;
2175         else if (net_client_use_send())
2176                 handle_pfds = handle_pfds_entries;
2177         else if (output_name && (strcmp(output_name, "-") == 0)) {
2178                 piped_output = 1;
2179                 handle_pfds = handle_pfds_entries;
2180                 pfp = stdout;
2181                 setvbuf(pfp, NULL, _IONBF, 0);
2182         } else
2183                 handle_pfds = handle_pfds_file;
2184         return 0;
2185 }
2186
2187 static void ch_add_connection(struct net_server_s *ns, struct cl_host *ch,
2188                               int fd)
2189 {
2190         struct cl_conn *nc;
2191
2192         nc = malloc(sizeof(*nc));
2193         memset(nc, 0, sizeof(*nc));
2194
2195         time(&nc->connect_time);
2196         nc->ch = ch;
2197         nc->fd = fd;
2198         nc->ncpus = -1;
2199
2200         list_add_tail(&nc->ch_head, &ch->conn_list);
2201         ch->connects++;
2202
2203         list_add_tail(&nc->ns_head, &ns->conn_list);
2204         ns->connects++;
2205         ns->pfds = realloc(ns->pfds, (ns->connects+1) * sizeof(struct pollfd));
2206 }
2207
2208 static void ch_rem_connection(struct net_server_s *ns, struct cl_host *ch,
2209                               struct cl_conn *nc)
2210 {
2211         net_close_connection(&nc->fd);
2212
2213         list_del(&nc->ch_head);
2214         ch->connects--;
2215
2216         list_del(&nc->ns_head);
2217         ns->connects--;
2218         ns->pfds = realloc(ns->pfds, (ns->connects+1) * sizeof(struct pollfd));
2219
2220         free(nc);
2221 }
2222
2223 static struct cl_host *net_find_client_host(struct net_server_s *ns,
2224                                             struct in_addr cl_in_addr)
2225 {
2226         struct list_head *p;
2227
2228         __list_for_each(p, &ns->ch_list) {
2229                 struct cl_host *ch = list_entry(p, struct cl_host, head);
2230
2231                 if (in_addr_eq(ch->cl_in_addr, cl_in_addr))
2232                         return ch;
2233         }
2234
2235         return NULL;
2236 }
2237
2238 static struct cl_host *net_add_client_host(struct net_server_s *ns,
2239                                            struct sockaddr_in *addr)
2240 {
2241         struct cl_host *ch;
2242
2243         ch = malloc(sizeof(*ch));
2244         memset(ch, 0, sizeof(*ch));
2245
2246         ch->ns = ns;
2247         ch->cl_in_addr = addr->sin_addr;
2248         list_add_tail(&ch->head, &ns->ch_list);
2249         ns->nchs++;
2250
2251         ch->hostname = strdup(inet_ntoa(addr->sin_addr));
2252         printf("server: connection from %s\n", ch->hostname);
2253
2254         INIT_LIST_HEAD(&ch->conn_list);
2255         INIT_LIST_HEAD(&ch->devpaths);
2256
2257         return ch;
2258 }
2259
2260 static void device_done(struct devpath *dpp, int ncpus)
2261 {
2262         int cpu;
2263         struct io_info *iop;
2264
2265         for (cpu = 0, iop = dpp->ios; cpu < ncpus; cpu++, iop++)
2266                 close_iop(iop);
2267
2268         list_del(&dpp->head);
2269         dpp_free(dpp);
2270 }
2271
2272 static void net_ch_remove(struct cl_host *ch, int ncpus)
2273 {
2274         struct list_head *p, *q;
2275         struct net_server_s *ns = ch->ns;
2276
2277         list_for_each_safe(p, q, &ch->devpaths) {
2278                 struct devpath *dpp = list_entry(p, struct devpath, head);
2279                 device_done(dpp, ncpus);
2280         }
2281
2282         list_for_each_safe(p, q, &ch->conn_list) {
2283                 struct cl_conn *nc = list_entry(p, struct cl_conn, ch_head);
2284
2285                 ch_rem_connection(ns, ch, nc);
2286         }
2287
2288         list_del(&ch->head);
2289         ns->nchs--;
2290
2291         if (ch->hostname)
2292                 free(ch->hostname);
2293         free(ch);
2294 }
2295
2296 static void net_add_connection(struct net_server_s *ns)
2297 {
2298         int fd;
2299         struct cl_host *ch;
2300         socklen_t socklen = sizeof(ns->addr);
2301
2302         fd = my_accept(ns->listen_fd, (struct sockaddr *)&ns->addr, &socklen);
2303         if (fd < 0) {
2304                 /*
2305                  * This is OK: we just won't accept this connection,
2306                  * nothing fatal.
2307                  */
2308                 perror("accept");
2309         } else {
2310                 ch = net_find_client_host(ns, ns->addr.sin_addr);
2311                 if (!ch)
2312                         ch = net_add_client_host(ns, &ns->addr);
2313
2314                 ch_add_connection(ns, ch, fd);
2315         }
2316 }
2317
2318 static struct devpath *nc_add_dpp(struct cl_conn *nc,
2319                                   struct blktrace_net_hdr *bnh,
2320                                   time_t connect_time)
2321 {
2322         int cpu;
2323         struct io_info *iop;
2324         struct devpath *dpp;
2325
2326         dpp = malloc(sizeof(*dpp));
2327         memset(dpp, 0, sizeof(*dpp));
2328
2329         dpp->buts_name = strdup(bnh->buts_name);
2330         dpp->path = strdup(bnh->buts_name);
2331         dpp->fd = -1;
2332         dpp->ch = nc->ch;
2333         dpp->cl_id = bnh->cl_id;
2334         dpp->cl_connect_time = connect_time;
2335         dpp->ncpus = nc->ncpus;
2336         dpp->stats = calloc(dpp->ncpus, sizeof(*dpp->stats));
2337         memset(dpp->stats, 0, dpp->ncpus * sizeof(*dpp->stats));
2338
2339         list_add_tail(&dpp->head, &nc->ch->devpaths);
2340         nc->ch->ndevs++;
2341
2342         dpp->ios = calloc(nc->ncpus, sizeof(*iop));
2343         memset(dpp->ios, 0, ndevs * sizeof(*iop));
2344
2345         for (cpu = 0, iop = dpp->ios; cpu < nc->ncpus; cpu++, iop++) {
2346                 iop->dpp = dpp;
2347                 iop->nc = nc;
2348                 init_mmap_info(&iop->mmap_info);
2349
2350                 if (iop_open(iop, cpu))
2351                         goto err;
2352         }
2353
2354         return dpp;
2355
2356 err:
2357         /*
2358          * Need to unravel what's been done...
2359          */
2360         while (cpu >= 0)
2361                 close_iop(&dpp->ios[cpu--]);
2362         dpp_free(dpp);
2363
2364         return NULL;
2365 }
2366
2367 static struct devpath *nc_find_dpp(struct cl_conn *nc,
2368                                    struct blktrace_net_hdr *bnh)
2369 {
2370         struct list_head *p;
2371         time_t connect_time = nc->connect_time;
2372
2373         __list_for_each(p, &nc->ch->devpaths) {
2374                 struct devpath *dpp = list_entry(p, struct devpath, head);
2375
2376                 if (!strcmp(dpp->buts_name, bnh->buts_name))
2377                         return dpp;
2378
2379                 if (dpp->cl_id == bnh->cl_id)
2380                         connect_time = dpp->cl_connect_time;
2381         }
2382
2383         return nc_add_dpp(nc, bnh, connect_time);
2384 }
2385
2386 static void net_client_read_data(struct cl_conn *nc, struct devpath *dpp,
2387                                  struct blktrace_net_hdr *bnh)
2388 {
2389         int ret;
2390         struct io_info *iop = &dpp->ios[bnh->cpu];
2391         struct mmap_info *mip = &iop->mmap_info;
2392
2393         if (setup_mmap(iop->ofd, bnh->len, &iop->mmap_info, NULL)) {
2394                 fprintf(stderr, "ncd(%s:%d): mmap failed\n",
2395                         nc->ch->hostname, nc->fd);
2396                 exit(1);
2397         }
2398
2399         ret = net_recv_data(nc->fd, mip->fs_buf + mip->fs_off, bnh->len);
2400         if (ret > 0) {
2401                 pdc_dr_update(dpp, bnh->cpu, ret);
2402                 mip->fs_size += ret;
2403                 mip->fs_off += ret;
2404         } else if (ret < 0)
2405                 exit(1);
2406 }
2407
2408 /*
2409  * Returns 1 if we closed a host - invalidates other polling information
2410  * that may be present.
2411  */
2412 static int net_client_data(struct cl_conn *nc)
2413 {
2414         int ret;
2415         struct devpath *dpp;
2416         struct blktrace_net_hdr bnh;
2417
2418         ret = net_get_header(nc, &bnh);
2419         if (ret == 0)
2420                 return 0;
2421
2422         if (ret < 0) {
2423                 fprintf(stderr, "ncd(%d): header read failed\n", nc->fd);
2424                 exit(1);
2425         }
2426
2427         if (data_is_native == -1 && check_data_endianness(bnh.magic)) {
2428                 fprintf(stderr, "ncd(%d): received data is bad\n", nc->fd);
2429                 exit(1);
2430         }
2431
2432         if (!data_is_native) {
2433                 bnh.magic = be32_to_cpu(bnh.magic);
2434                 bnh.cpu = be32_to_cpu(bnh.cpu);
2435                 bnh.max_cpus = be32_to_cpu(bnh.max_cpus);
2436                 bnh.len = be32_to_cpu(bnh.len);
2437                 bnh.cl_id = be32_to_cpu(bnh.cl_id);
2438                 bnh.buf_size = be32_to_cpu(bnh.buf_size);
2439                 bnh.buf_nr = be32_to_cpu(bnh.buf_nr);
2440                 bnh.page_size = be32_to_cpu(bnh.page_size);
2441         }
2442
2443         if ((bnh.magic & 0xffffff00) != BLK_IO_TRACE_MAGIC) {
2444                 fprintf(stderr, "ncd(%s:%d): bad data magic\n",
2445                         nc->ch->hostname, nc->fd);
2446                 exit(1);
2447         }
2448
2449         if (nc->ncpus == -1)
2450                 nc->ncpus = bnh.max_cpus;
2451
2452         /*
2453          * len == 0 means the other end is sending us a new connection/dpp
2454          * len == 1 means that the other end signalled end-of-run
2455          */
2456         dpp = nc_find_dpp(nc, &bnh);
2457         if (bnh.len == 0) {
2458                 /*
2459                  * Just adding in the dpp above is enough
2460                  */
2461                 ack_open_close(nc->fd, dpp->buts_name);
2462                 nc->ch->cl_opens++;
2463         } else if (bnh.len == 1) {
2464                 /*
2465                  * overload cpu count with dropped events
2466                  */
2467                 dpp->drops = bnh.cpu;
2468
2469                 ack_open_close(nc->fd, dpp->buts_name);
2470                 if (--nc->ch->cl_opens == 0) {
2471                         show_stats(&nc->ch->devpaths);
2472                         net_ch_remove(nc->ch, nc->ncpus);
2473                         return 1;
2474                 }
2475         } else
2476                 net_client_read_data(nc, dpp, &bnh);
2477
2478         return 0;
2479 }
2480
2481 static void handle_client_data(struct net_server_s *ns, int events)
2482 {
2483         struct cl_conn *nc;
2484         struct pollfd *pfd;
2485         struct list_head *p, *q;
2486
2487         pfd = &ns->pfds[1];
2488         list_for_each_safe(p, q, &ns->conn_list) {
2489                 if (pfd->revents & POLLIN) {
2490                         nc = list_entry(p, struct cl_conn, ns_head);
2491
2492                         if (net_client_data(nc) || --events == 0)
2493                                 break;
2494                 }
2495                 pfd++;
2496         }
2497 }
2498
2499 static void net_setup_pfds(struct net_server_s *ns)
2500 {
2501         struct pollfd *pfd;
2502         struct list_head *p;
2503
2504         ns->pfds[0].fd = ns->listen_fd;
2505         ns->pfds[0].events = POLLIN;
2506
2507         pfd = &ns->pfds[1];
2508         __list_for_each(p, &ns->conn_list) {
2509                 struct cl_conn *nc = list_entry(p, struct cl_conn, ns_head);
2510
2511                 pfd->fd = nc->fd;
2512                 pfd->events = POLLIN;
2513                 pfd++;
2514         }
2515 }
2516
2517 static int net_server_handle_connections(struct net_server_s *ns)
2518 {
2519         int events;
2520
2521         printf("server: waiting for connections...\n");
2522
2523         while (!done) {
2524                 net_setup_pfds(ns);
2525                 events = poll(ns->pfds, ns->connects + 1, -1);
2526                 if (events < 0) {
2527                         if (errno != EINTR) {
2528                                 perror("FATAL: poll error");
2529                                 return 1;
2530                         }
2531                 } else if (events > 0) {
2532                         if (ns->pfds[0].revents & POLLIN) {
2533                                 net_add_connection(ns);
2534                                 events--;
2535                         }
2536
2537                         if (events)
2538                                 handle_client_data(ns, events);
2539                 }
2540         }
2541
2542         return 0;
2543 }
2544
2545 static int net_server(void)
2546 {
2547         int fd, opt;
2548         int ret = 1;
2549         struct net_server_s net_server;
2550         struct net_server_s *ns = &net_server;
2551
2552         memset(ns, 0, sizeof(*ns));
2553         INIT_LIST_HEAD(&ns->ch_list);
2554         INIT_LIST_HEAD(&ns->conn_list);
2555         ns->pfds = malloc(sizeof(struct pollfd));
2556
2557         fd = my_socket(AF_INET, SOCK_STREAM, 0);
2558         if (fd < 0) {
2559                 perror("server: socket");
2560                 goto out;
2561         }
2562
2563         opt = 1;
2564         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
2565                 perror("setsockopt");
2566                 goto out;
2567         }
2568
2569         memset(&ns->addr, 0, sizeof(ns->addr));
2570         ns->addr.sin_family = AF_INET;
2571         ns->addr.sin_addr.s_addr = htonl(INADDR_ANY);
2572         ns->addr.sin_port = htons(net_port);
2573
2574         if (bind(fd, (struct sockaddr *) &ns->addr, sizeof(ns->addr)) < 0) {
2575                 perror("bind");
2576                 goto out;
2577         }
2578
2579         if (listen(fd, 1) < 0) {
2580                 perror("listen");
2581                 goto out;
2582         }
2583
2584         /*
2585          * The actual server looping is done here:
2586          */
2587         ns->listen_fd = fd;
2588         ret = net_server_handle_connections(ns);
2589
2590         /*
2591          * Clean up and return...
2592          */
2593 out:
2594         free(ns->pfds);
2595         return ret;
2596 }
2597
2598 static int run_tracers(void)
2599 {
2600         atexit(exit_tracing);
2601         if (net_mode == Net_client)
2602                 printf("blktrace: connecting to %s\n", hostname);
2603
2604         setup_buts();
2605
2606         if (use_tracer_devpaths()) {
2607                 if (setup_tracer_devpaths())
2608                         return 1;
2609
2610                 if (piped_output)
2611                         handle_list = handle_list_file;
2612                 else
2613                         handle_list = handle_list_net;
2614         }
2615
2616         start_tracers();
2617         if (nthreads_running == ncpus) {
2618                 unblock_tracers();
2619                 start_buts();
2620                 if (net_mode == Net_client)
2621                         printf("blktrace: connected!\n");
2622                 if (stop_watch)
2623                         alarm(stop_watch);
2624         } else
2625                 stop_tracers();
2626
2627         wait_tracers();
2628         if (nthreads_running == ncpus)
2629                 show_stats(&devpaths);
2630         if (net_client_use_send())
2631                 close_client_connections();
2632         del_tracers();
2633
2634         return 0;
2635 }
2636
2637 int main(int argc, char *argv[])
2638 {
2639         int ret = 0;
2640
2641         setlocale(LC_NUMERIC, "en_US");
2642         pagesize = getpagesize();
2643         ncpus = sysconf(_SC_NPROCESSORS_ONLN);
2644         if (ncpus < 0) {
2645                 fprintf(stderr, "sysconf(_SC_NPROCESSORS_ONLN) failed %d/%s\n",
2646                         errno, strerror(errno));
2647                 ret = 1;
2648                 goto out;
2649         } else if (handle_args(argc, argv)) {
2650                 ret = 1;
2651                 goto out;
2652         }
2653
2654         if (ndevs > 1 && output_name && strcmp(output_name, "-") != 0) {
2655                 fprintf(stderr, "-o not supported with multiple devices\n");
2656                 ret = 1;
2657                 goto out;
2658         }
2659
2660         signal(SIGINT, handle_sigint);
2661         signal(SIGHUP, handle_sigint);
2662         signal(SIGTERM, handle_sigint);
2663         signal(SIGALRM, handle_sigint);
2664         signal(SIGPIPE, SIG_IGN);
2665
2666         if (kill_running_trace) {
2667                 struct devpath *dpp;
2668                 struct list_head *p;
2669
2670                 __list_for_each(p, &devpaths) {
2671                         dpp = list_entry(p, struct devpath, head);
2672                         if (__stop_trace(dpp->fd)) {
2673                                 fprintf(stderr,
2674                                         "BLKTRACETEARDOWN %s failed: %d/%s\n",
2675                                         dpp->path, errno, strerror(errno));
2676                         }
2677                 }
2678         } else if (net_mode == Net_server) {
2679                 if (output_name) {
2680                         fprintf(stderr, "-o ignored in server mode\n");
2681                         output_name = NULL;
2682                 }
2683                 ret = net_server();
2684         } else
2685                 ret = run_tracers();
2686
2687 out:
2688         if (pfp)
2689                 fclose(pfp);
2690         rel_devpaths();
2691         return ret;
2692 }