Merge branch 'master' of ssh://axboe@router.home.kernel.dk/data/git/blktrace
[blktrace.git] / blktrace.c
1 /*
2  * block queue tracing application
3  *
4  * Copyright (C) 2005 Jens Axboe <axboe@suse.de>
5  * Copyright (C) 2006 Jens Axboe <axboe@kernel.dk>
6  *
7  * Rewrite to have a single thread per CPU (managing all devices on that CPU)
8  *      Alan D. Brunelle <alan.brunelle@hp.com> - January 2009
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *
20  *  You should have received a copy of the GNU General Public License
21  *  along with this program; if not, write to the Free Software
22  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23  *
24  */
25
26 #include <errno.h>
27 #include <stdarg.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <fcntl.h>
32 #include <getopt.h>
33 #include <sched.h>
34 #include <unistd.h>
35 #include <poll.h>
36 #include <signal.h>
37 #include <pthread.h>
38 #include <locale.h>
39 #include <sys/ioctl.h>
40 #include <sys/types.h>
41 #include <sys/stat.h>
42 #include <sys/vfs.h>
43 #include <sys/mman.h>
44 #include <sys/param.h>
45 #include <sys/time.h>
46 #include <sys/resource.h>
47 #include <sys/socket.h>
48 #include <netinet/in.h>
49 #include <arpa/inet.h>
50 #include <netdb.h>
51 #include <sys/sendfile.h>
52
53 #include "btt/list.h"
54 #include "blktrace.h"
55
56 /*
57  * You may want to increase this even more, if you are logging at a high
58  * rate and see skipped/missed events
59  */
60 #define BUF_SIZE                (512 * 1024)
61 #define BUF_NR                  (4)
62
63 #define FILE_VBUF_SIZE          (128 * 1024)
64
65 #define DEBUGFS_TYPE            (0x64626720)
66 #define TRACE_NET_PORT          (8462)
67
68 enum {
69         Net_none = 0,
70         Net_server,
71         Net_client,
72 };
73
74 enum thread_status {
75         Th_running,
76         Th_leaving,
77         Th_error
78 };
79
80 /*
81  * Generic stats collected: nevents can be _roughly_ estimated by data_read
82  * (discounting pdu...)
83  *
84  * These fields are updated w/ pdc_dr_update & pdc_nev_update below.
85  */
86 struct pdc_stats {
87         unsigned long long data_read;
88         unsigned long long nevents;
89 };
90
91 struct devpath {
92         struct list_head head;
93         char *path;                     /* path to device special file */
94         char *buts_name;                /* name returned from bt kernel code */
95         struct pdc_stats *stats;
96         int fd, idx, ncpus;
97         unsigned long long drops;
98
99         /*
100          * For piped output only:
101          *
102          * Each tracer will have a tracer_devpath_head that it will add new
103          * data onto. It's list is protected above (tracer_devpath_head.mutex)
104          * and it will signal the processing thread using the dp_cond,
105          * dp_mutex & dp_entries variables above.
106          */
107         struct tracer_devpath_head *heads;
108
109         /*
110          * For network server mode only:
111          */
112         struct cl_host *ch;
113         u32 cl_id;
114         time_t cl_connect_time;
115         struct io_info *ios;
116 };
117
118 /*
119  * For piped output to stdout we will have each tracer thread (one per dev)
120  * tack buffers read from the relay queues on a per-device list.
121  *
122  * The main thread will then collect trace buffers from each of lists in turn.
123  *
124  * We will use a mutex to guard each of the trace_buf list. The tracers
125  * can then signal the main thread using <dp_cond,dp_mutex> and
126  * dp_entries. (When dp_entries is 0, and a tracer adds an entry it will
127  * signal. When dp_entries is 0, the main thread will wait for that condition
128  * to be signalled.)
129  *
130  * adb: It may be better just to have a large buffer per tracer per dev,
131  * and then use it as a ring-buffer. This would certainly cut down a lot
132  * of malloc/free thrashing, at the cost of more memory movements (potentially).
133  */
134 struct trace_buf {
135         struct list_head head;
136         struct devpath *dpp;
137         void *buf;
138         int cpu, len;
139 };
140
141 struct tracer_devpath_head {
142         pthread_mutex_t mutex;
143         struct list_head head;
144         struct trace_buf *prev;
145 };
146
147 /*
148  * Used to handle the mmap() interfaces for output file (containing traces)
149  */
150 struct mmap_info {
151         void *fs_buf;
152         unsigned long long fs_size, fs_max_size, fs_off, fs_buf_len;
153         unsigned long buf_size, buf_nr;
154         int pagesize;
155 };
156
157 /*
158  * Each thread doing work on a (client) side of blktrace will have one
159  * of these. The ios array contains input/output information, pfds holds
160  * poll() data. The volatile's provide flags to/from the main executing
161  * thread.
162  */
163 struct tracer {
164         struct list_head head;
165         struct io_info *ios;
166         struct pollfd *pfds;
167         pthread_t thread;
168         int cpu, nios;
169         volatile int status, is_done;
170 };
171
172 /*
173  * networking stuff follows. we include a magic number so we know whether
174  * to endianness convert or not.
175  *
176  * The len field is overloaded:
177  *      0 - Indicates an "open" - allowing the server to set up for a dev/cpu
178  *      1 - Indicates a "close" - Shut down connection orderly
179  *
180  * The cpu field is overloaded on close: it will contain the number of drops.
181  */
182 struct blktrace_net_hdr {
183         u32 magic;              /* same as trace magic */
184         char buts_name[32];     /* trace name */
185         u32 cpu;                /* for which cpu */
186         u32 max_cpus;
187         u32 len;                /* length of following trace data */
188         u32 cl_id;              /* id for set of client per-cpu connections */
189         u32 buf_size;           /* client buf_size for this trace  */
190         u32 buf_nr;             /* client buf_nr for this trace  */
191         u32 page_size;          /* client page_size for this trace  */
192 };
193
194 /*
195  * Each host encountered has one of these. The head is used to link this
196  * on to the network server's ch_list. Connections associated with this
197  * host are linked on conn_list, and any devices traced on that host
198  * are connected on the devpaths list.
199  */
200 struct cl_host {
201         struct list_head head;
202         struct list_head conn_list;
203         struct list_head devpaths;
204         struct net_server_s *ns;
205         char *hostname;
206         struct in_addr cl_in_addr;
207         int connects, ndevs, cl_opens;
208 };
209
210 /*
211  * Each connection (client to server socket ('fd')) has one of these. A
212  * back reference to the host ('ch'), and lists headers (for the host
213  * list, and the network server conn_list) are also included.
214  */
215 struct cl_conn {
216         struct list_head ch_head, ns_head;
217         struct cl_host *ch;
218         int fd, ncpus;
219         time_t connect_time;
220 };
221
222 /*
223  * The network server requires some poll structures to be maintained -
224  * one per conection currently on conn_list. The nchs/ch_list values
225  * are for each host connected to this server. The addr field is used
226  * for scratch as new connections are established.
227  */
228 struct net_server_s {
229         struct list_head conn_list;
230         struct list_head ch_list;
231         struct pollfd *pfds;
232         int listen_fd, connects, nchs;
233         struct sockaddr_in addr;
234 };
235
236 /*
237  * This structure is (generically) used to providide information
238  * for a read-to-write set of values.
239  *
240  * ifn & ifd represent input information
241  *
242  * ofn, ofd, ofp, obuf & mmap_info are used for output file (optionally).
243  */
244 struct io_info {
245         struct devpath *dpp;
246         FILE *ofp;
247         char *obuf;
248         struct cl_conn *nc;     /* Server network connection */
249
250         /*
251          * mmap controlled output files
252          */
253         struct mmap_info mmap_info;
254
255         /*
256          * Client network fields
257          */
258         unsigned int ready;
259         unsigned long long data_queued;
260
261         /*
262          * Input/output file descriptors & names
263          */
264         int ifd, ofd;
265         char ifn[MAXPATHLEN + 64];
266         char ofn[MAXPATHLEN + 64];
267 };
268
269 static char blktrace_version[] = "2.0.0";
270
271 /*
272  * Linkage to blktrace helper routines (trace conversions)
273  */
274 int data_is_native = -1;
275
276 static int ndevs;
277 static int ncpus;
278 static int pagesize;
279 static int act_mask = ~0U;
280 static int kill_running_trace;
281 static int stop_watch;
282 static int piped_output;
283
284 static char *debugfs_path = "/sys/kernel/debug";
285 static char *output_name;
286 static char *output_dir;
287
288 static unsigned long buf_size = BUF_SIZE;
289 static unsigned long buf_nr = BUF_NR;
290
291 static FILE *pfp;
292
293 static LIST_HEAD(devpaths);
294 static LIST_HEAD(tracers);
295
296 static volatile int done;
297
298 /*
299  * tracer threads add entries, the main thread takes them off and processes
300  * them. These protect the dp_entries variable.
301  */
302 static pthread_cond_t dp_cond = PTHREAD_COND_INITIALIZER;
303 static pthread_mutex_t dp_mutex = PTHREAD_MUTEX_INITIALIZER;
304 static volatile int dp_entries;
305
306 /*
307  * These synchronize master / thread interactions.
308  */
309 static pthread_cond_t mt_cond = PTHREAD_COND_INITIALIZER;
310 static pthread_mutex_t mt_mutex = PTHREAD_MUTEX_INITIALIZER;
311 static volatile int nthreads_running;
312 static volatile int nthreads_leaving;
313 static volatile int nthreads_error;
314 static volatile int tracers_run;
315
316 /*
317  * network cmd line params
318  */
319 static struct sockaddr_in hostname_addr;
320 static char hostname[MAXHOSTNAMELEN];
321 static int net_port = TRACE_NET_PORT;
322 static int net_use_sendfile = 1;
323 static int net_mode;
324 static int *cl_fds;
325
326 static int (*handle_pfds)(struct tracer *, int, int);
327 static int (*handle_list)(struct tracer_devpath_head *, struct list_head *);
328
329 #define S_OPTS  "d:a:A:r:o:kw:vVb:n:D:lh:p:sI:"
330 static struct option l_opts[] = {
331         {
332                 .name = "dev",
333                 .has_arg = required_argument,
334                 .flag = NULL,
335                 .val = 'd'
336         },
337         {
338                 .name = "input-devs",
339                 .has_arg = required_argument,
340                 .flag = NULL,
341                 .val = 'I'
342         },
343         {
344                 .name = "act-mask",
345                 .has_arg = required_argument,
346                 .flag = NULL,
347                 .val = 'a'
348         },
349         {
350                 .name = "set-mask",
351                 .has_arg = required_argument,
352                 .flag = NULL,
353                 .val = 'A'
354         },
355         {
356                 .name = "relay",
357                 .has_arg = required_argument,
358                 .flag = NULL,
359                 .val = 'r'
360         },
361         {
362                 .name = "output",
363                 .has_arg = required_argument,
364                 .flag = NULL,
365                 .val = 'o'
366         },
367         {
368                 .name = "kill",
369                 .has_arg = no_argument,
370                 .flag = NULL,
371                 .val = 'k'
372         },
373         {
374                 .name = "stopwatch",
375                 .has_arg = required_argument,
376                 .flag = NULL,
377                 .val = 'w'
378         },
379         {
380                 .name = "version",
381                 .has_arg = no_argument,
382                 .flag = NULL,
383                 .val = 'v'
384         },
385         {
386                 .name = "version",
387                 .has_arg = no_argument,
388                 .flag = NULL,
389                 .val = 'V'
390         },
391         {
392                 .name = "buffer-size",
393                 .has_arg = required_argument,
394                 .flag = NULL,
395                 .val = 'b'
396         },
397         {
398                 .name = "num-sub-buffers",
399                 .has_arg = required_argument,
400                 .flag = NULL,
401                 .val = 'n'
402         },
403         {
404                 .name = "output-dir",
405                 .has_arg = required_argument,
406                 .flag = NULL,
407                 .val = 'D'
408         },
409         {
410                 .name = "listen",
411                 .has_arg = no_argument,
412                 .flag = NULL,
413                 .val = 'l'
414         },
415         {
416                 .name = "host",
417                 .has_arg = required_argument,
418                 .flag = NULL,
419                 .val = 'h'
420         },
421         {
422                 .name = "port",
423                 .has_arg = required_argument,
424                 .flag = NULL,
425                 .val = 'p'
426         },
427         {
428                 .name = "no-sendfile",
429                 .has_arg = no_argument,
430                 .flag = NULL,
431                 .val = 's'
432         },
433         {
434                 .name = NULL,
435         }
436 };
437
438 static char usage_str[] = \
439         "-d <dev> [ -r debugfs path ] [ -o <output> ] [-k ] [ -w time ]\n" \
440         "[ -a action ] [ -A action mask ] [ -I  <devs file> ] [ -v ]\n\n" \
441         "\t-d Use specified device. May also be given last after options\n" \
442         "\t-r Path to mounted debugfs, defaults to /sys/kernel/debug\n" \
443         "\t-o File(s) to send output to\n" \
444         "\t-D Directory to prepend to output file names\n" \
445         "\t-k Kill a running trace\n" \
446         "\t-w Stop after defined time, in seconds\n" \
447         "\t-a Only trace specified actions. See documentation\n" \
448         "\t-A Give trace mask as a single value. See documentation\n" \
449         "\t-b Sub buffer size in KiB\n" \
450         "\t-n Number of sub buffers\n" \
451         "\t-l Run in network listen mode (blktrace server)\n" \
452         "\t-h Run in network client mode, connecting to the given host\n" \
453         "\t-p Network port to use (default 8462)\n" \
454         "\t-s Make the network client NOT use sendfile() to transfer data\n" \
455         "\t-I Add devices found in <devs file>\n" \
456         "\t-V Print program version info\n\n";
457
458 static void clear_events(struct pollfd *pfd)
459 {
460         pfd->events = 0;
461         pfd->revents = 0;
462 }
463
464 static inline int net_client_use_sendfile(void)
465 {
466         return net_mode == Net_client && net_use_sendfile;
467 }
468
469 static inline int net_client_use_send(void)
470 {
471         return net_mode == Net_client && !net_use_sendfile;
472 }
473
474 static inline int use_tracer_devpaths(void)
475 {
476         return piped_output || net_client_use_send();
477 }
478
479 static inline int in_addr_eq(struct in_addr a, struct in_addr b)
480 {
481         return a.s_addr == b.s_addr;
482 }
483
484 static inline void pdc_dr_update(struct devpath *dpp, int cpu, int data_read)
485 {
486         dpp->stats[cpu].data_read += data_read;
487 }
488
489 static inline void pdc_nev_update(struct devpath *dpp, int cpu, int nevents)
490 {
491         dpp->stats[cpu].nevents += nevents;
492 }
493
494 static void show_usage(char *prog)
495 {
496         fprintf(stderr, "Usage: %s %s %s", prog, blktrace_version, usage_str);
497 }
498
499 /*
500  * Create a timespec 'msec' milliseconds into the future
501  */
502 static inline void make_timespec(struct timespec *tsp, long delta_msec)
503 {
504         struct timeval now;
505
506         gettimeofday(&now, NULL);
507         tsp->tv_sec = now.tv_sec;
508         tsp->tv_nsec = 1000L * now.tv_usec;
509
510         tsp->tv_nsec += (delta_msec * 1000000L);
511         if (tsp->tv_nsec > 1000000000L) {
512                 long secs = tsp->tv_nsec / 1000000000L;
513
514                 tsp->tv_sec += secs;
515                 tsp->tv_nsec -= (secs * 1000000000L);
516         }
517 }
518
519 /*
520  * Add a timer to ensure wait ends
521  */
522 static void t_pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
523 {
524         struct timespec ts;
525
526         make_timespec(&ts, 50);
527         pthread_cond_timedwait(cond, mutex, &ts);
528 }
529
530 static void unblock_tracers(void)
531 {
532         pthread_mutex_lock(&mt_mutex);
533         tracers_run = 1;
534         pthread_cond_broadcast(&mt_cond);
535         pthread_mutex_unlock(&mt_mutex);
536 }
537
538 static void tracer_wait_unblock(struct tracer *tp)
539 {
540         pthread_mutex_lock(&mt_mutex);
541         while (!tp->is_done && !tracers_run)
542                 pthread_cond_wait(&mt_cond, &mt_mutex);
543         pthread_mutex_unlock(&mt_mutex);
544 }
545
546 static void tracer_signal_ready(struct tracer *tp,
547                                 enum thread_status th_status,
548                                 int status)
549 {
550         pthread_mutex_lock(&mt_mutex);
551         tp->status = status;
552
553         if (th_status == Th_running)
554                 nthreads_running++;
555         else if (th_status == Th_error)
556                 nthreads_error++;
557         else
558                 nthreads_leaving++;
559
560         pthread_cond_signal(&mt_cond);
561         pthread_mutex_unlock(&mt_mutex);
562 }
563
564 static void wait_tracers_ready(int ncpus_started)
565 {
566         pthread_mutex_lock(&mt_mutex);
567         while ((nthreads_running + nthreads_error) < ncpus_started)
568                 t_pthread_cond_wait(&mt_cond, &mt_mutex);
569         pthread_mutex_unlock(&mt_mutex);
570 }
571
572 static void wait_tracers_leaving(void)
573 {
574         pthread_mutex_lock(&mt_mutex);
575         while (nthreads_leaving < nthreads_running)
576                 t_pthread_cond_wait(&mt_cond, &mt_mutex);
577         pthread_mutex_unlock(&mt_mutex);
578 }
579
580 static void init_mmap_info(struct mmap_info *mip)
581 {
582         mip->buf_size = buf_size;
583         mip->buf_nr = buf_nr;
584         mip->pagesize = pagesize;
585 }
586
587 static void net_close_connection(int *fd)
588 {
589         shutdown(*fd, SHUT_RDWR);
590         close(*fd);
591         *fd = -1;
592 }
593
594 static void dpp_free(struct devpath *dpp)
595 {
596         if (dpp->stats)
597                 free(dpp->stats);
598         if (dpp->ios)
599                 free(dpp->ios);
600         if (dpp->path)
601                 free(dpp->path);
602         if (dpp->buts_name)
603                 free(dpp->buts_name);
604         free(dpp);
605 }
606
607 static int lock_on_cpu(int cpu)
608 {
609         cpu_set_t cpu_mask;
610
611         CPU_ZERO(&cpu_mask);
612         CPU_SET(cpu, &cpu_mask);
613         if (sched_setaffinity(getpid(), sizeof(cpu_mask), &cpu_mask) < 0)
614                 return errno;
615
616         return 0;
617 }
618
619 static int increase_limit(int resource, rlim_t increase)
620 {
621         struct rlimit rlim;
622         int save_errno = errno;
623
624         if (!getrlimit(resource, &rlim)) {
625                 rlim.rlim_cur += increase;
626                 if (rlim.rlim_cur >= rlim.rlim_max)
627                         rlim.rlim_max = rlim.rlim_cur + increase;
628
629                 if (!setrlimit(resource, &rlim))
630                         return 1;
631         }
632
633         errno = save_errno;
634         return 0;
635 }
636
637 static int handle_open_failure(void)
638 {
639         if (errno == ENFILE || errno == EMFILE)
640                 return increase_limit(RLIMIT_NOFILE, 16);
641         return 0;
642 }
643
644 static int handle_mem_failure(size_t length)
645 {
646         if (errno == ENFILE)
647                 return handle_open_failure();
648         else if (errno == ENOMEM)
649                 return increase_limit(RLIMIT_MEMLOCK, 2 * length);
650         return 0;
651 }
652
653 static FILE *my_fopen(const char *path, const char *mode)
654 {
655         FILE *fp;
656
657         do {
658                 fp = fopen(path, mode);
659         } while (fp == NULL && handle_open_failure());
660
661         return fp;
662 }
663
664 static int my_open(const char *path, int flags)
665 {
666         int fd;
667
668         do {
669                 fd = open(path, flags);
670         } while (fd < 0 && handle_open_failure());
671
672         return fd;
673 }
674
675 static int my_socket(int domain, int type, int protocol)
676 {
677         int fd;
678
679         do {
680                 fd = socket(domain, type, protocol);
681         } while (fd < 0 && handle_open_failure());
682
683         return fd;
684 }
685
686 static int my_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen)
687 {
688         int fd;
689
690         do {
691                 fd = accept(sockfd, addr, addrlen);
692         } while (fd < 0 && handle_open_failure());
693
694         return fd;
695 }
696
697 static void *my_mmap(void *addr, size_t length, int prot, int flags, int fd,
698                      off_t offset)
699 {
700         void *new;
701
702         do {
703                 new = mmap(addr, length, prot, flags, fd, offset);
704         } while (new == MAP_FAILED && handle_mem_failure(length));
705
706         return new;
707 }
708
709 static int my_mlock(const void *addr, size_t len)
710 {
711         int ret;
712
713         do {
714                 ret = mlock(addr, len);
715         } while (ret < 0 && handle_mem_failure(len));
716
717         return ret;
718 }
719
720 static int setup_mmap(int fd, unsigned int maxlen, struct mmap_info *mip)
721 {
722         if (mip->fs_off + maxlen > mip->fs_buf_len) {
723                 unsigned long nr = max(16, mip->buf_nr);
724
725                 if (mip->fs_buf) {
726                         munlock(mip->fs_buf, mip->fs_buf_len);
727                         munmap(mip->fs_buf, mip->fs_buf_len);
728                         mip->fs_buf = NULL;
729                 }
730
731                 mip->fs_off = mip->fs_size & (mip->pagesize - 1);
732                 mip->fs_buf_len = (nr * mip->buf_size) - mip->fs_off;
733                 mip->fs_max_size += mip->fs_buf_len;
734
735                 if (ftruncate(fd, mip->fs_max_size) < 0) {
736                         perror("setup_mmap: ftruncate");
737                         return 1;
738                 }
739
740                 mip->fs_buf = my_mmap(NULL, mip->fs_buf_len, PROT_WRITE,
741                                       MAP_SHARED, fd,
742                                       mip->fs_size - mip->fs_off);
743                 if (mip->fs_buf == MAP_FAILED) {
744                         perror("setup_mmap: mmap");
745                         return 1;
746                 }
747                 my_mlock(mip->fs_buf, mip->fs_buf_len);
748         }
749
750         return 0;
751 }
752
753 static int __stop_trace(int fd)
754 {
755         /*
756          * Should be stopped, don't complain if it isn't
757          */
758         ioctl(fd, BLKTRACESTOP);
759         return ioctl(fd, BLKTRACETEARDOWN);
760 }
761
762 static int write_data(char *buf, int len)
763 {
764         int ret;
765
766 rewrite:
767         ret = fwrite(buf, len, 1, pfp);
768         if (ferror(pfp) || ret != 1) {
769                 if (errno == EINTR) {
770                         clearerr(pfp);
771                         goto rewrite;
772                 }
773
774                 if (!piped_output || (errno != EPIPE && errno != EBADF)) {
775                         fprintf(stderr, "write(%d) failed: %d/%s\n",
776                                 len, errno, strerror(errno));
777                 }
778                 goto err;
779         }
780
781         fflush(pfp);
782         return 0;
783
784 err:
785         clearerr(pfp);
786         return 1;
787 }
788
789 /*
790  * Returns the number of bytes read (successfully)
791  */
792 static int __net_recv_data(int fd, void *buf, unsigned int len)
793 {
794         unsigned int bytes_left = len;
795
796         while (bytes_left && !done) {
797                 int ret = recv(fd, buf, bytes_left, MSG_WAITALL);
798
799                 if (ret == 0)
800                         break;
801                 else if (ret < 0) {
802                         if (errno == EAGAIN) {
803                                 usleep(50);
804                                 continue;
805                         }
806                         perror("server: net_recv_data: recv failed");
807                         break;
808                 } else {
809                         buf += ret;
810                         bytes_left -= ret;
811                 }
812         }
813
814         return len - bytes_left;
815 }
816
817 static int net_recv_data(int fd, void *buf, unsigned int len)
818 {
819         return __net_recv_data(fd, buf, len);
820 }
821
822 /*
823  * Returns number of bytes written
824  */
825 static int net_send_data(int fd, void *buf, unsigned int buf_len)
826 {
827         int ret;
828         unsigned int bytes_left = buf_len;
829
830         while (bytes_left) {
831                 ret = send(fd, buf, bytes_left, 0);
832                 if (ret < 0) {
833                         perror("send");
834                         break;
835                 }
836
837                 buf += ret;
838                 bytes_left -= ret;
839         }
840
841         return buf_len - bytes_left;
842 }
843
844 static int net_send_header(int fd, int cpu, char *buts_name, int len)
845 {
846         struct blktrace_net_hdr hdr;
847
848         memset(&hdr, 0, sizeof(hdr));
849
850         hdr.magic = BLK_IO_TRACE_MAGIC;
851         strncpy(hdr.buts_name, buts_name, sizeof(hdr.buts_name));
852         hdr.buts_name[sizeof(hdr.buts_name)-1] = '\0';
853         hdr.cpu = cpu;
854         hdr.max_cpus = ncpus;
855         hdr.len = len;
856         hdr.cl_id = getpid();
857         hdr.buf_size = buf_size;
858         hdr.buf_nr = buf_nr;
859         hdr.page_size = pagesize;
860
861         return net_send_data(fd, &hdr, sizeof(hdr)) != sizeof(hdr);
862 }
863
864 static void net_send_open_close(int fd, int cpu, char *buts_name, int len)
865 {
866         struct blktrace_net_hdr ret_hdr;
867
868         net_send_header(fd, cpu, buts_name, len);
869         net_recv_data(fd, &ret_hdr, sizeof(ret_hdr));
870 }
871
872 static void net_send_open(int fd, int cpu, char *buts_name)
873 {
874         net_send_open_close(fd, cpu, buts_name, 0);
875 }
876
877 static void net_send_close(int fd, char *buts_name, int drops)
878 {
879         /*
880          * Overload CPU w/ number of drops
881          *
882          * XXX: Need to clear/set done around call - done=1 (which
883          * is true here) stops reads from happening... :-(
884          */
885         done = 0;
886         net_send_open_close(fd, drops, buts_name, 1);
887         done = 1;
888 }
889
890 static void ack_open_close(int fd, char *buts_name)
891 {
892         net_send_header(fd, 0, buts_name, 2);
893 }
894
895 static void net_send_drops(int fd)
896 {
897         struct list_head *p;
898
899         __list_for_each(p, &devpaths) {
900                 struct devpath *dpp = list_entry(p, struct devpath, head);
901
902                 net_send_close(fd, dpp->buts_name, dpp->drops);
903         }
904 }
905
906 /*
907  * Returns:
908  *       0: "EOF"
909  *       1: OK
910  *      -1: Error
911  */
912 static int net_get_header(struct cl_conn *nc, struct blktrace_net_hdr *bnh)
913 {
914         int bytes_read;
915         int fl = fcntl(nc->fd, F_GETFL);
916
917         fcntl(nc->fd, F_SETFL, fl | O_NONBLOCK);
918         bytes_read = __net_recv_data(nc->fd, bnh, sizeof(*bnh));
919         fcntl(nc->fd, F_SETFL, fl & ~O_NONBLOCK);
920
921         if (bytes_read == sizeof(*bnh))
922                 return 1;
923         else if (bytes_read == 0)
924                 return 0;
925         else
926                 return -1;
927 }
928
929 static int net_setup_addr(void)
930 {
931         struct sockaddr_in *addr = &hostname_addr;
932
933         memset(addr, 0, sizeof(*addr));
934         addr->sin_family = AF_INET;
935         addr->sin_port = htons(net_port);
936
937         if (inet_aton(hostname, &addr->sin_addr) != 1) {
938                 struct hostent *hent;
939 retry:
940                 hent = gethostbyname(hostname);
941                 if (!hent) {
942                         if (h_errno == TRY_AGAIN) {
943                                 usleep(100);
944                                 goto retry;
945                         } else if (h_errno == NO_RECOVERY) {
946                                 fprintf(stderr, "gethostbyname(%s)"
947                                         "non-recoverable error encountered\n",
948                                         hostname);
949                         } else {
950                                 /*
951                                  * HOST_NOT_FOUND, NO_ADDRESS or NO_DATA
952                                  */
953                                 fprintf(stderr, "Host %s not found\n",
954                                         hostname);
955                         }
956                         return 1;
957                 }
958
959                 memcpy(&addr->sin_addr, hent->h_addr, 4);
960                 strcpy(hostname, hent->h_name);
961         }
962
963         return 0;
964 }
965
966 static int net_setup_client(void)
967 {
968         int fd;
969         struct sockaddr_in *addr = &hostname_addr;
970
971         fd = my_socket(AF_INET, SOCK_STREAM, 0);
972         if (fd < 0) {
973                 perror("client: socket");
974                 return -1;
975         }
976
977         if (connect(fd, (struct sockaddr *)addr, sizeof(*addr)) < 0) {
978                 if (errno == ECONNREFUSED)
979                         fprintf(stderr,
980                                 "\nclient: Connection to %s refused, "
981                                 "perhaps the server is not started?\n\n",
982                                 hostname);
983                 else
984                         perror("client: connect");
985
986                 close(fd);
987                 return -1;
988         }
989
990         return fd;
991 }
992
993 static int open_client_connections(void)
994 {
995         int cpu;
996
997         cl_fds = calloc(ncpus, sizeof(*cl_fds));
998         for (cpu = 0; cpu < ncpus; cpu++) {
999                 cl_fds[cpu] = net_setup_client();
1000                 if (cl_fds[cpu] < 0)
1001                         goto err;
1002         }
1003         return 0;
1004
1005 err:
1006         while (cpu > 0)
1007                 close(cl_fds[cpu--]);
1008         free(cl_fds);
1009         return 1;
1010 }
1011
1012 static void close_client_connections(void)
1013 {
1014         if (cl_fds) {
1015                 int cpu, *fdp;
1016
1017                 for (cpu = 0, fdp = cl_fds; cpu < ncpus; cpu++, fdp++) {
1018                         if (*fdp >= 0) {
1019                                 net_send_drops(*fdp);
1020                                 net_close_connection(fdp);
1021                         }
1022                 }
1023                 free(cl_fds);
1024         }
1025 }
1026
1027 static void setup_buts(void)
1028 {
1029         struct list_head *p;
1030
1031         __list_for_each(p, &devpaths) {
1032                 struct blk_user_trace_setup buts;
1033                 struct devpath *dpp = list_entry(p, struct devpath, head);
1034
1035                 memset(&buts, 0, sizeof(buts));
1036                 buts.buf_size = buf_size;
1037                 buts.buf_nr = buf_nr;
1038                 buts.act_mask = act_mask;
1039
1040                 if (ioctl(dpp->fd, BLKTRACESETUP, &buts) >= 0) {
1041                         dpp->ncpus = ncpus;
1042                         dpp->buts_name = strdup(buts.name);
1043                         if (dpp->stats)
1044                                 free(dpp->stats);
1045                         dpp->stats = calloc(dpp->ncpus, sizeof(*dpp->stats));
1046                         memset(dpp->stats, 0, dpp->ncpus * sizeof(*dpp->stats));
1047                 } else
1048                         fprintf(stderr, "BLKTRACESETUP(2) %s failed: %d/%s\n",
1049                                 dpp->path, errno, strerror(errno));
1050         }
1051 }
1052
1053 static void start_buts(void)
1054 {
1055         struct list_head *p;
1056
1057         __list_for_each(p, &devpaths) {
1058                 struct devpath *dpp = list_entry(p, struct devpath, head);
1059
1060                 if (ioctl(dpp->fd, BLKTRACESTART) < 0) {
1061                         fprintf(stderr, "BLKTRACESTART %s failed: %d/%s\n",
1062                                 dpp->path, errno, strerror(errno));
1063                 }
1064         }
1065 }
1066
1067 static int get_drops(struct devpath *dpp)
1068 {
1069         int fd, drops = 0;
1070         char fn[MAXPATHLEN + 64], tmp[256];
1071
1072         snprintf(fn, sizeof(fn), "%s/block/%s/dropped", debugfs_path,
1073                  dpp->buts_name);
1074
1075         fd = my_open(fn, O_RDONLY);
1076         if (fd < 0) {
1077                 /*
1078                  * This may be ok: the kernel may not support
1079                  * dropped counts.
1080                  */
1081                 if (errno != ENOENT)
1082                         fprintf(stderr, "Could not open %s: %d/%s\n",
1083                                 fn, errno, strerror(errno));
1084                 return 0;
1085         } else if (read(fd, tmp, sizeof(tmp)) < 0) {
1086                 fprintf(stderr, "Could not read %s: %d/%s\n",
1087                         fn, errno, strerror(errno));
1088         } else
1089                 drops = atoi(tmp);
1090         close(fd);
1091
1092         return drops;
1093 }
1094
1095 static void get_all_drops(void)
1096 {
1097         struct list_head *p;
1098
1099         __list_for_each(p, &devpaths) {
1100                 struct devpath *dpp = list_entry(p, struct devpath, head);
1101
1102                 dpp->drops = get_drops(dpp);
1103         }
1104 }
1105
1106 static inline struct trace_buf *alloc_trace_buf(int cpu, int bufsize)
1107 {
1108         struct trace_buf *tbp;
1109
1110         tbp = malloc(sizeof(*tbp) + bufsize);
1111         INIT_LIST_HEAD(&tbp->head);
1112         tbp->len = 0;
1113         tbp->buf = (void *)(tbp + 1);
1114         tbp->cpu = cpu;
1115         tbp->dpp = NULL;        /* Will be set when tbp is added */
1116
1117         return tbp;
1118 }
1119
1120 static void free_tracer_heads(struct devpath *dpp)
1121 {
1122         int cpu;
1123         struct tracer_devpath_head *hd;
1124
1125         for (cpu = 0, hd = dpp->heads; cpu < ncpus; cpu++, hd++) {
1126                 if (hd->prev)
1127                         free(hd->prev);
1128
1129                 pthread_mutex_destroy(&hd->mutex);
1130         }
1131         free(dpp->heads);
1132 }
1133
1134 static int setup_tracer_devpaths(void)
1135 {
1136         struct list_head *p;
1137
1138         if (net_client_use_send())
1139                 if (open_client_connections())
1140                         return 1;
1141
1142         __list_for_each(p, &devpaths) {
1143                 int cpu;
1144                 struct tracer_devpath_head *hd;
1145                 struct devpath *dpp = list_entry(p, struct devpath, head);
1146
1147                 dpp->heads = calloc(ncpus, sizeof(struct tracer_devpath_head));
1148                 for (cpu = 0, hd = dpp->heads; cpu < ncpus; cpu++, hd++) {
1149                         INIT_LIST_HEAD(&hd->head);
1150                         pthread_mutex_init(&hd->mutex, NULL);
1151                         hd->prev = NULL;
1152                 }
1153         }
1154
1155         return 0;
1156 }
1157
1158 static inline void add_trace_buf(struct devpath *dpp, int cpu,
1159                                                 struct trace_buf **tbpp)
1160 {
1161         struct trace_buf *tbp = *tbpp;
1162         struct tracer_devpath_head *hd = &dpp->heads[cpu];
1163
1164         tbp->dpp = dpp;
1165
1166         pthread_mutex_lock(&hd->mutex);
1167         list_add_tail(&tbp->head, &hd->head);
1168         pthread_mutex_unlock(&hd->mutex);
1169
1170         *tbpp = alloc_trace_buf(cpu, buf_size);
1171 }
1172
1173 static inline void incr_entries(int entries_handled)
1174 {
1175         pthread_mutex_lock(&dp_mutex);
1176         if (dp_entries == 0)
1177                 pthread_cond_signal(&dp_cond);
1178         dp_entries += entries_handled;
1179         pthread_mutex_unlock(&dp_mutex);
1180 }
1181
1182 static void decr_entries(int handled)
1183 {
1184         pthread_mutex_lock(&dp_mutex);
1185         dp_entries -= handled;
1186         pthread_mutex_unlock(&dp_mutex);
1187 }
1188
1189 static int wait_empty_entries(void)
1190 {
1191         pthread_mutex_lock(&dp_mutex);
1192         while (!done && dp_entries == 0)
1193                 t_pthread_cond_wait(&dp_cond, &dp_mutex);
1194         pthread_mutex_unlock(&dp_mutex);
1195
1196         return !done;
1197 }
1198
1199 static int add_devpath(char *path)
1200 {
1201         int fd;
1202         struct devpath *dpp;
1203
1204         /*
1205          * Verify device is valid before going too far
1206          */
1207         fd = my_open(path, O_RDONLY | O_NONBLOCK);
1208         if (fd < 0) {
1209                 fprintf(stderr, "Invalid path %s specified: %d/%s\n",
1210                         path, errno, strerror(errno));
1211                 return 1;
1212         }
1213
1214         dpp = malloc(sizeof(*dpp));
1215         memset(dpp, 0, sizeof(*dpp));
1216         dpp->path = strdup(path);
1217         dpp->fd = fd;
1218         dpp->idx = ndevs++;
1219         list_add_tail(&dpp->head, &devpaths);
1220
1221         return 0;
1222 }
1223
1224 static void rel_devpaths(void)
1225 {
1226         struct list_head *p, *q;
1227
1228         list_for_each_safe(p, q, &devpaths) {
1229                 struct devpath *dpp = list_entry(p, struct devpath, head);
1230
1231                 list_del(&dpp->head);
1232                 __stop_trace(dpp->fd);
1233                 close(dpp->fd);
1234
1235                 if (dpp->heads)
1236                         free_tracer_heads(dpp);
1237
1238                 dpp_free(dpp);
1239                 ndevs--;
1240         }
1241 }
1242
1243 static int flush_subbuf_net(struct trace_buf *tbp)
1244 {
1245         int fd = cl_fds[tbp->cpu];
1246         struct devpath *dpp = tbp->dpp;
1247
1248         if (net_send_header(fd, tbp->cpu, dpp->buts_name, tbp->len))
1249                 return 1;
1250         else if (net_send_data(fd, tbp->buf, tbp->len) != tbp->len)
1251                 return 1;
1252
1253         return 0;
1254 }
1255
1256 static int
1257 handle_list_net(__attribute__((__unused__))struct tracer_devpath_head *hd,
1258                 struct list_head *list)
1259 {
1260         struct trace_buf *tbp;
1261         struct list_head *p, *q;
1262         int entries_handled = 0;
1263
1264         list_for_each_safe(p, q, list) {
1265                 tbp = list_entry(p, struct trace_buf, head);
1266
1267                 list_del(&tbp->head);
1268                 entries_handled++;
1269
1270                 if (cl_fds[tbp->cpu] >= 0) {
1271                         if (flush_subbuf_net(tbp)) {
1272                                 close(cl_fds[tbp->cpu]);
1273                                 cl_fds[tbp->cpu] = -1;
1274                         }
1275                 }
1276
1277                 free(tbp);
1278         }
1279
1280         return entries_handled;
1281 }
1282
1283 /*
1284  * Tack 'tbp's buf onto the tail of 'prev's buf
1285  */
1286 static struct trace_buf *tb_combine(struct trace_buf *prev,
1287                                     struct trace_buf *tbp)
1288 {
1289         unsigned long tot_len;
1290
1291         tot_len = prev->len + tbp->len;
1292         if (tot_len > buf_size) {
1293                 /*
1294                  * tbp->head isn't connected (it was 'prev'
1295                  * so it had been taken off of the list
1296                  * before). Therefore, we can realloc
1297                  * the whole structures, as the other fields
1298                  * are "static".
1299                  */
1300                 prev = realloc(prev->buf, sizeof(*prev) + tot_len);
1301                 prev->buf = (void *)(prev + 1);
1302         }
1303
1304         memcpy(prev->buf + prev->len, tbp->buf, tbp->len);
1305         prev->len = tot_len;
1306
1307         free(tbp);
1308         return prev;
1309 }
1310
1311 static int handle_list_file(struct tracer_devpath_head *hd,
1312                             struct list_head *list)
1313 {
1314         int off, t_len, nevents;
1315         struct blk_io_trace *t;
1316         struct list_head *p, *q;
1317         int entries_handled = 0;
1318         struct trace_buf *tbp, *prev;
1319
1320         prev = hd->prev;
1321         list_for_each_safe(p, q, list) {
1322                 tbp = list_entry(p, struct trace_buf, head);
1323                 list_del(&tbp->head);
1324                 entries_handled++;
1325
1326                 /*
1327                  * If there was some leftover before, tack this new
1328                  * entry onto the tail of the previous one.
1329                  */
1330                 if (prev)
1331                         tbp = tb_combine(prev, tbp);
1332
1333                 /*
1334                  * See how many whole traces there are - send them
1335                  * all out in one go.
1336                  */
1337                 off = 0;
1338                 nevents = 0;
1339                 while (off + (int)sizeof(*t) <= tbp->len) {
1340                         t = (struct blk_io_trace *)(tbp->buf + off);
1341                         t_len = sizeof(*t) + t->pdu_len;
1342                         if (off + t_len > tbp->len)
1343                                 break;
1344
1345                         off += t_len;
1346                         nevents++;
1347                 }
1348                 if (nevents)
1349                         pdc_nev_update(tbp->dpp, tbp->cpu, nevents);
1350
1351                 /*
1352                  * Write any full set of traces, any remaining data is kept
1353                  * for the next pass.
1354                  */
1355                 if (off) {
1356                         if (write_data(tbp->buf, off) || off == tbp->len) {
1357                                 free(tbp);
1358                                 prev = NULL;
1359                         }
1360                         else {
1361                                 /*
1362                                  * Move valid data to beginning of buffer
1363                                  */
1364                                 tbp->len -= off;
1365                                 memmove(tbp->buf, tbp->buf + off, tbp->len);
1366                                 prev = tbp;
1367                         }
1368                 } else
1369                         prev = tbp;
1370         }
1371         hd->prev = prev;
1372
1373         return entries_handled;
1374 }
1375
1376 static void __process_trace_bufs(void)
1377 {
1378         int cpu;
1379         struct list_head *p;
1380         struct list_head list;
1381         int handled = 0;
1382
1383         __list_for_each(p, &devpaths) {
1384                 struct devpath *dpp = list_entry(p, struct devpath, head);
1385                 struct tracer_devpath_head *hd = dpp->heads;
1386
1387                 for (cpu = 0; cpu < ncpus; cpu++, hd++) {
1388                         pthread_mutex_lock(&hd->mutex);
1389                         if (list_empty(&hd->head)) {
1390                                 pthread_mutex_unlock(&hd->mutex);
1391                                 continue;
1392                         }
1393
1394                         list_replace_init(&hd->head, &list);
1395                         pthread_mutex_unlock(&hd->mutex);
1396
1397                         handled += handle_list(hd, &list);
1398                 }
1399         }
1400
1401         if (handled)
1402                 decr_entries(handled);
1403 }
1404
1405 static void process_trace_bufs(void)
1406 {
1407         while (wait_empty_entries())
1408                 __process_trace_bufs();
1409 }
1410
1411 static void clean_trace_bufs(void)
1412 {
1413         /*
1414          * No mutex needed here: we're only reading from the lists,
1415          * tracers are done
1416          */
1417         while (dp_entries)
1418                 __process_trace_bufs();
1419 }
1420
1421 static inline void read_err(int cpu, char *ifn)
1422 {
1423         if (errno != EAGAIN)
1424                 fprintf(stderr, "Thread %d failed read of %s: %d/%s\n",
1425                         cpu, ifn, errno, strerror(errno));
1426 }
1427
1428 static int net_sendfile(struct io_info *iop)
1429 {
1430         int ret;
1431
1432         ret = sendfile(iop->ofd, iop->ifd, NULL, iop->ready);
1433         if (ret < 0) {
1434                 perror("sendfile");
1435                 return 1;
1436         } else if (ret < (int)iop->ready) {
1437                 fprintf(stderr, "short sendfile send (%d of %d)\n",
1438                         ret, iop->ready);
1439                 return 1;
1440         }
1441
1442         return 0;
1443 }
1444
1445 static inline int net_sendfile_data(struct tracer *tp, struct io_info *iop)
1446 {
1447         struct devpath *dpp = iop->dpp;
1448
1449         if (net_send_header(iop->ofd, tp->cpu, dpp->buts_name, iop->ready))
1450                 return 1;
1451         return net_sendfile(iop);
1452 }
1453
1454 static int fill_ofname(struct io_info *iop, int cpu)
1455 {
1456         int len;
1457         struct stat sb;
1458         char *dst = iop->ofn;
1459
1460         if (output_dir)
1461                 len = snprintf(iop->ofn, sizeof(iop->ofn), "%s/", output_dir);
1462         else
1463                 len = snprintf(iop->ofn, sizeof(iop->ofn), "./");
1464
1465         if (net_mode == Net_server) {
1466                 struct cl_conn *nc = iop->nc;
1467
1468                 len += sprintf(dst + len, "%s-", nc->ch->hostname);
1469                 len += strftime(dst + len, 64, "%F-%T/",
1470                                 gmtime(&iop->dpp->cl_connect_time));
1471         }
1472
1473         if (stat(iop->ofn, &sb) < 0) {
1474                 if (errno != ENOENT) {
1475                         fprintf(stderr,
1476                                 "Destination dir %s stat failed: %d/%s\n",
1477                                 iop->ofn, errno, strerror(errno));
1478                         return 1;
1479                 }
1480                 if (mkdir(iop->ofn, 0755) < 0) {
1481                         fprintf(stderr,
1482                                 "Destination dir %s can't be made: %d/%s\n",
1483                                 iop->ofn, errno, strerror(errno));
1484                         return 1;
1485                 }
1486         }
1487
1488         if (output_name)
1489                 snprintf(iop->ofn + len, sizeof(iop->ofn), "%s.blktrace.%d",
1490                          output_name, cpu);
1491         else
1492                 snprintf(iop->ofn + len, sizeof(iop->ofn), "%s.blktrace.%d",
1493                          iop->dpp->buts_name, cpu);
1494
1495         return 0;
1496 }
1497
1498 static int set_vbuf(struct io_info *iop, int mode, size_t size)
1499 {
1500         iop->obuf = malloc(size);
1501         if (setvbuf(iop->ofp, iop->obuf, mode, size) < 0) {
1502                 fprintf(stderr, "setvbuf(%s, %d) failed: %d/%s\n",
1503                         iop->dpp->path, (int)size, errno,
1504                         strerror(errno));
1505                 free(iop->obuf);
1506                 return 1;
1507         }
1508
1509         return 0;
1510 }
1511
1512 static int iop_open(struct io_info *iop, int cpu)
1513 {
1514         iop->ofd = -1;
1515         if (fill_ofname(iop, cpu))
1516                 return 1;
1517
1518         iop->ofp = my_fopen(iop->ofn, "w+");
1519         if (iop->ofp == NULL) {
1520                 fprintf(stderr, "Open output file %s failed: %d/%s\n",
1521                         iop->ofn, errno, strerror(errno));
1522                 return 1;
1523         }
1524
1525         if (set_vbuf(iop, _IOLBF, FILE_VBUF_SIZE)) {
1526                 fprintf(stderr, "set_vbuf for file %s failed: %d/%s\n",
1527                         iop->ofn, errno, strerror(errno));
1528                 fclose(iop->ofp);
1529                 return 1;
1530         }
1531
1532         iop->ofd = fileno(iop->ofp);
1533         return 0;
1534 }
1535
1536 static void close_iop(struct io_info *iop)
1537 {
1538         struct mmap_info *mip = &iop->mmap_info;
1539
1540         if (mip->fs_buf)
1541                 munmap(mip->fs_buf, mip->fs_buf_len);
1542
1543         if (!piped_output) {
1544                 if (ftruncate(fileno(iop->ofp), mip->fs_size) < 0) {
1545                         fprintf(stderr,
1546                                 "Ignoring err: ftruncate(%s): %d/%s\n",
1547                                 iop->ofn, errno, strerror(errno));
1548                 }
1549         }
1550
1551         if (iop->ofp)
1552                 fclose(iop->ofp);
1553         if (iop->obuf)
1554                 free(iop->obuf);
1555 }
1556
1557 static void close_ios(struct tracer *tp)
1558 {
1559         while (tp->nios > 0) {
1560                 struct io_info *iop = &tp->ios[--tp->nios];
1561
1562                 iop->dpp->drops = get_drops(iop->dpp);
1563                 if (iop->ifd >= 0)
1564                         close(iop->ifd);
1565
1566                 if (iop->ofp)
1567                         close_iop(iop);
1568                 else if (iop->ofd >= 0) {
1569                         struct devpath *dpp = iop->dpp;
1570
1571                         net_send_close(iop->ofd, dpp->buts_name, dpp->drops);
1572                         net_close_connection(&iop->ofd);
1573                 }
1574         }
1575
1576         free(tp->ios);
1577         free(tp->pfds);
1578 }
1579
1580 static int open_ios(struct tracer *tp)
1581 {
1582         struct pollfd *pfd;
1583         struct io_info *iop;
1584         struct list_head *p;
1585
1586         tp->ios = calloc(ndevs, sizeof(struct io_info));
1587         memset(tp->ios, 0, ndevs * sizeof(struct io_info));
1588
1589         tp->pfds = calloc(ndevs, sizeof(struct pollfd));
1590         memset(tp->pfds, 0, ndevs * sizeof(struct pollfd));
1591
1592         tp->nios = 0;
1593         iop = tp->ios;
1594         pfd = tp->pfds;
1595         __list_for_each(p, &devpaths) {
1596                 struct devpath *dpp = list_entry(p, struct devpath, head);
1597
1598                 iop->dpp = dpp;
1599                 iop->ofd = -1;
1600                 snprintf(iop->ifn, sizeof(iop->ifn), "%s/block/%s/trace%d",
1601                         debugfs_path, dpp->buts_name, tp->cpu);
1602
1603                 iop->ifd = my_open(iop->ifn, O_RDONLY | O_NONBLOCK);
1604                 if (iop->ifd < 0) {
1605                         fprintf(stderr, "Thread %d failed open %s: %d/%s\n",
1606                                 tp->cpu, iop->ifn, errno, strerror(errno));
1607                         return 1;
1608                 }
1609
1610                 init_mmap_info(&iop->mmap_info);
1611
1612                 pfd->fd = iop->ifd;
1613                 pfd->events = POLLIN;
1614
1615                 if (piped_output)
1616                         ;
1617                 else if (net_client_use_sendfile()) {
1618                         iop->ofd = net_setup_client();
1619                         if (iop->ofd < 0)
1620                                 goto err;
1621                         net_send_open(iop->ofd, tp->cpu, dpp->buts_name);
1622                 } else if (net_mode == Net_none) {
1623                         if (iop_open(iop, tp->cpu))
1624                                 goto err;
1625                 } else {
1626                         /*
1627                          * This ensures that the server knows about all
1628                          * connections & devices before _any_ closes
1629                          */
1630                         net_send_open(cl_fds[tp->cpu], tp->cpu, dpp->buts_name);
1631                 }
1632
1633                 pfd++;
1634                 iop++;
1635                 tp->nios++;
1636         }
1637
1638         return 0;
1639
1640 err:
1641         close(iop->ifd);        /* tp->nios _not_ bumped */
1642         close_ios(tp);
1643         return 1;
1644 }
1645
1646 static int handle_pfds_file(struct tracer *tp, int nevs, int force_read)
1647 {
1648         struct mmap_info *mip;
1649         int i, ret, nentries = 0;
1650         struct pollfd *pfd = tp->pfds;
1651         struct io_info *iop = tp->ios;
1652
1653         for (i = 0; nevs > 0 && i < ndevs; i++, pfd++, iop++) {
1654                 if (pfd->revents & POLLIN || force_read) {
1655                         mip = &iop->mmap_info;
1656
1657                         ret = setup_mmap(iop->ofd, buf_size, mip);
1658                         if (ret < 0) {
1659                                 pfd->events = 0;
1660                                 break;
1661                         }
1662
1663                         ret = read(iop->ifd, mip->fs_buf + mip->fs_off,
1664                                    buf_size);
1665                         if (ret > 0) {
1666                                 pdc_dr_update(iop->dpp, tp->cpu, ret);
1667                                 mip->fs_size += ret;
1668                                 mip->fs_off += ret;
1669                                 nentries++;
1670                         } else if (ret == 0) {
1671                                 /*
1672                                  * Short reads after we're done stop us
1673                                  * from trying reads.
1674                                  */
1675                                 if (tp->is_done)
1676                                         clear_events(pfd);
1677                         } else {
1678                                 read_err(tp->cpu, iop->ifn);
1679                                 if (errno != EAGAIN || tp->is_done)
1680                                         clear_events(pfd);
1681                         }
1682                         nevs--;
1683                 }
1684         }
1685
1686         return nentries;
1687 }
1688
1689 static int handle_pfds_netclient(struct tracer *tp, int nevs, int force_read)
1690 {
1691         struct stat sb;
1692         int i, nentries = 0;
1693         struct pdc_stats *sp;
1694         struct pollfd *pfd = tp->pfds;
1695         struct io_info *iop = tp->ios;
1696
1697         for (i = 0; i < ndevs; i++, pfd++, iop++, sp++) {
1698                 if (pfd->revents & POLLIN || force_read) {
1699                         if (fstat(iop->ifd, &sb) < 0) {
1700                                 perror(iop->ifn);
1701                                 pfd->events = 0;
1702                         } else if (sb.st_size > (off_t)iop->data_queued) {
1703                                 iop->ready = sb.st_size - iop->data_queued;
1704                                 iop->data_queued = sb.st_size;
1705
1706                                 if (!net_sendfile_data(tp, iop)) {
1707                                         pdc_dr_update(iop->dpp, tp->cpu,
1708                                                       iop->ready);
1709                                         nentries++;
1710                                 } else
1711                                         clear_events(pfd);
1712                         }
1713                         if (--nevs == 0)
1714                                 break;
1715                 }
1716         }
1717
1718         if (nentries)
1719                 incr_entries(nentries);
1720
1721         return nentries;
1722 }
1723
1724 static int handle_pfds_entries(struct tracer *tp, int nevs, int force_read)
1725 {
1726         int i, nentries = 0;
1727         struct trace_buf *tbp;
1728         struct pollfd *pfd = tp->pfds;
1729         struct io_info *iop = tp->ios;
1730
1731         tbp = alloc_trace_buf(tp->cpu, buf_size);
1732         for (i = 0; i < ndevs; i++, pfd++, iop++) {
1733                 if (pfd->revents & POLLIN || force_read) {
1734                         tbp->len = read(iop->ifd, tbp->buf, buf_size);
1735                         if (tbp->len > 0) {
1736                                 pdc_dr_update(iop->dpp, tp->cpu, tbp->len);
1737                                 add_trace_buf(iop->dpp, tp->cpu, &tbp);
1738                                 nentries++;
1739                         } else if (tbp->len == 0) {
1740                                 /*
1741                                  * Short reads after we're done stop us
1742                                  * from trying reads.
1743                                  */
1744                                 if (tp->is_done)
1745                                         clear_events(pfd);
1746                         } else {
1747                                 read_err(tp->cpu, iop->ifn);
1748                                 if (errno != EAGAIN || tp->is_done)
1749                                         clear_events(pfd);
1750                         }
1751                         if (!piped_output && --nevs == 0)
1752                                 break;
1753                 }
1754         }
1755         free(tbp);
1756
1757         if (nentries)
1758                 incr_entries(nentries);
1759
1760         return nentries;
1761 }
1762
1763 static void *thread_main(void *arg)
1764 {
1765         int ret, ndone, to_val;
1766         struct tracer *tp = arg;
1767
1768         ret = lock_on_cpu(tp->cpu);
1769         if (ret)
1770                 goto err;
1771
1772         ret = open_ios(tp);
1773         if (ret)
1774                 goto err;
1775
1776         if (piped_output)
1777                 to_val = 50;            /* Frequent partial handles */
1778         else
1779                 to_val = 500;           /* 1/2 second intervals */
1780
1781
1782         tracer_signal_ready(tp, Th_running, 0);
1783         tracer_wait_unblock(tp);
1784
1785         while (!tp->is_done) {
1786                 ndone = poll(tp->pfds, ndevs, to_val);
1787                 if (ndone || piped_output)
1788                         (void)handle_pfds(tp, ndone, piped_output);
1789                 else if (ndone < 0 && errno != EINTR)
1790                         fprintf(stderr, "Thread %d poll failed: %d/%s\n",
1791                                 tp->cpu, errno, strerror(errno));
1792         }
1793
1794         /*
1795          * Trace is stopped, pull data until we get a short read
1796          */
1797         while (handle_pfds(tp, ndevs, 1) > 0)
1798                 ;
1799
1800         close_ios(tp);
1801         tracer_signal_ready(tp, Th_leaving, 0);
1802         return NULL;
1803
1804 err:
1805         tracer_signal_ready(tp, Th_error, ret);
1806         return NULL;
1807 }
1808
1809 static int start_tracer(int cpu)
1810 {
1811         struct tracer *tp;
1812
1813         tp = malloc(sizeof(*tp));
1814         memset(tp, 0, sizeof(*tp));
1815
1816         INIT_LIST_HEAD(&tp->head);
1817         tp->status = 0;
1818         tp->cpu = cpu;
1819
1820         if (pthread_create(&tp->thread, NULL, thread_main, tp)) {
1821                 fprintf(stderr, "FAILED to start thread on CPU %d: %d/%s\n",
1822                         cpu, errno, strerror(errno));
1823                 free(tp);
1824                 return 1;
1825         }
1826
1827         list_add_tail(&tp->head, &tracers);
1828         return 0;
1829 }
1830
1831 static void start_tracers(void)
1832 {
1833         int cpu;
1834         struct list_head *p;
1835
1836         for (cpu = 0; cpu < ncpus; cpu++)
1837                 if (start_tracer(cpu))
1838                         break;
1839
1840         wait_tracers_ready(cpu);
1841
1842         __list_for_each(p, &tracers) {
1843                 struct tracer *tp = list_entry(p, struct tracer, head);
1844                 if (tp->status)
1845                         fprintf(stderr,
1846                                 "FAILED to start thread on CPU %d: %d/%s\n",
1847                                 tp->cpu, tp->status, strerror(tp->status));
1848         }
1849 }
1850
1851 static void stop_tracers(void)
1852 {
1853         struct list_head *p;
1854
1855         /*
1856          * Stop the tracing - makes the tracer threads clean up quicker.
1857          */
1858         __list_for_each(p, &devpaths) {
1859                 struct devpath *dpp = list_entry(p, struct devpath, head);
1860                 (void)ioctl(dpp->fd, BLKTRACESTOP);
1861         }
1862
1863         /*
1864          * Tell each tracer to quit
1865          */
1866         __list_for_each(p, &tracers) {
1867                 struct tracer *tp = list_entry(p, struct tracer, head);
1868                 tp->is_done = 1;
1869         }
1870 }
1871
1872 static void del_tracers(void)
1873 {
1874         struct list_head *p, *q;
1875
1876         list_for_each_safe(p, q, &tracers) {
1877                 struct tracer *tp = list_entry(p, struct tracer, head);
1878
1879                 list_del(&tp->head);
1880                 free(tp);
1881         }
1882 }
1883
1884 static void wait_tracers(void)
1885 {
1886         struct list_head *p;
1887
1888         if (use_tracer_devpaths())
1889                 process_trace_bufs();
1890
1891         wait_tracers_leaving();
1892
1893         __list_for_each(p, &tracers) {
1894                 int ret;
1895                 struct tracer *tp = list_entry(p, struct tracer, head);
1896
1897                 ret = pthread_join(tp->thread, NULL);
1898                 if (ret)
1899                         fprintf(stderr, "Thread join %d failed %d\n",
1900                                 tp->cpu, ret);
1901         }
1902
1903         if (use_tracer_devpaths())
1904                 clean_trace_bufs();
1905
1906         get_all_drops();
1907 }
1908
1909 static void exit_tracing(void)
1910 {
1911         signal(SIGINT, SIG_IGN);
1912         signal(SIGHUP, SIG_IGN);
1913         signal(SIGTERM, SIG_IGN);
1914         signal(SIGALRM, SIG_IGN);
1915
1916         stop_tracers();
1917         wait_tracers();
1918         del_tracers();
1919         rel_devpaths();
1920 }
1921
1922 static void handle_sigint(__attribute__((__unused__)) int sig)
1923 {
1924         done = 1;
1925         stop_tracers();
1926 }
1927
1928 static void show_stats(struct list_head *devpaths)
1929 {
1930         FILE *ofp;
1931         struct list_head *p;
1932         unsigned long long nevents, data_read;
1933         unsigned long long total_drops = 0;
1934         unsigned long long total_events = 0;
1935
1936         if (piped_output)
1937                 ofp = my_fopen("/dev/null", "w");
1938         else
1939                 ofp = stdout;
1940
1941         __list_for_each(p, devpaths) {
1942                 int cpu;
1943                 struct pdc_stats *sp;
1944                 struct devpath *dpp = list_entry(p, struct devpath, head);
1945
1946                 if (net_mode == Net_server)
1947                         printf("server: end of run for %s:%s\n",
1948                                 dpp->ch->hostname, dpp->buts_name);
1949
1950                 data_read = 0;
1951                 nevents = 0;
1952
1953                 fprintf(ofp, "=== %s ===\n", dpp->buts_name);
1954                 for (cpu = 0, sp = dpp->stats; cpu < dpp->ncpus; cpu++, sp++) {
1955                         /*
1956                          * Estimate events if not known...
1957                          */
1958                         if (sp->nevents == 0) {
1959                                 sp->nevents = sp->data_read /
1960                                                 sizeof(struct blk_io_trace);
1961                         }
1962
1963                         fprintf(ofp,
1964                                 "  CPU%3d: %20llu events, %8llu KiB data\n",
1965                                 cpu, sp->nevents, (sp->data_read + 1023) >> 10);
1966
1967                         data_read += sp->data_read;
1968                         nevents += sp->nevents;
1969                 }
1970
1971                 fprintf(ofp, "  Total:  %20llu events (dropped %llu),"
1972                              " %8llu KiB data\n", nevents,
1973                              dpp->drops, (data_read + 1024) >> 10);
1974
1975                 total_drops += dpp->drops;
1976                 total_events += (nevents + dpp->drops);
1977         }
1978
1979         fflush(ofp);
1980         if (piped_output)
1981                 fclose(ofp);
1982
1983         if (total_drops) {
1984                 double drops_ratio = 1.0;
1985
1986                 if (total_events)
1987                         drops_ratio = (double)total_drops/(double)total_events;
1988
1989                 fprintf(stderr, "\nYou have %llu (%5.1lf%%) dropped events\n"
1990                                 "Consider using a larger buffer size (-b) "
1991                                 "and/or more buffers (-n)\n",
1992                         total_drops, 100.0 * drops_ratio);
1993         }
1994 }
1995
1996 static int handle_args(int argc, char *argv[])
1997 {
1998         int c, i;
1999         struct statfs st;
2000         int act_mask_tmp = 0;
2001
2002         while ((c = getopt_long(argc, argv, S_OPTS, l_opts, NULL)) >= 0) {
2003                 switch (c) {
2004                 case 'a':
2005                         i = find_mask_map(optarg);
2006                         if (i < 0) {
2007                                 fprintf(stderr, "Invalid action mask %s\n",
2008                                         optarg);
2009                                 return 1;
2010                         }
2011                         act_mask_tmp |= i;
2012                         break;
2013
2014                 case 'A':
2015                         if ((sscanf(optarg, "%x", &i) != 1) ||
2016                                                         !valid_act_opt(i)) {
2017                                 fprintf(stderr,
2018                                         "Invalid set action mask %s/0x%x\n",
2019                                         optarg, i);
2020                                 return 1;
2021                         }
2022                         act_mask_tmp = i;
2023                         break;
2024
2025                 case 'd':
2026                         if (add_devpath(optarg) != 0)
2027                                 return 1;
2028                         break;
2029
2030                 case 'I': {
2031                         char dev_line[256];
2032                         FILE *ifp = my_fopen(optarg, "r");
2033
2034                         if (!ifp) {
2035                                 fprintf(stderr,
2036                                         "Invalid file for devices %s\n",
2037                                         optarg);
2038                                 return 1;
2039                         }
2040
2041                         while (fscanf(ifp, "%s\n", dev_line) == 1)
2042                                 if (add_devpath(dev_line) != 0)
2043                                         return 1;
2044                         break;
2045                 }
2046
2047                 case 'r':
2048                         debugfs_path = optarg;
2049                         break;
2050
2051                 case 'o':
2052                         output_name = optarg;
2053                         break;
2054                 case 'k':
2055                         kill_running_trace = 1;
2056                         break;
2057                 case 'w':
2058                         stop_watch = atoi(optarg);
2059                         if (stop_watch <= 0) {
2060                                 fprintf(stderr,
2061                                         "Invalid stopwatch value (%d secs)\n",
2062                                         stop_watch);
2063                                 return 1;
2064                         }
2065                         break;
2066                 case 'V':
2067                 case 'v':
2068                         printf("%s version %s\n", argv[0], blktrace_version);
2069                         exit(0);
2070                         /*NOTREACHED*/
2071                 case 'b':
2072                         buf_size = strtoul(optarg, NULL, 10);
2073                         if (buf_size <= 0 || buf_size > 16*1024) {
2074                                 fprintf(stderr, "Invalid buffer size (%lu)\n",
2075                                         buf_size);
2076                                 return 1;
2077                         }
2078                         buf_size <<= 10;
2079                         break;
2080                 case 'n':
2081                         buf_nr = strtoul(optarg, NULL, 10);
2082                         if (buf_nr <= 0) {
2083                                 fprintf(stderr,
2084                                         "Invalid buffer nr (%lu)\n", buf_nr);
2085                                 return 1;
2086                         }
2087                         break;
2088                 case 'D':
2089                         output_dir = optarg;
2090                         break;
2091                 case 'h':
2092                         net_mode = Net_client;
2093                         strcpy(hostname, optarg);
2094                         break;
2095                 case 'l':
2096                         net_mode = Net_server;
2097                         break;
2098                 case 'p':
2099                         net_port = atoi(optarg);
2100                         break;
2101                 case 's':
2102                         net_use_sendfile = 0;
2103                         break;
2104                 default:
2105                         show_usage(argv[0]);
2106                         exit(1);
2107                         /*NOTREACHED*/
2108                 }
2109         }
2110
2111         while (optind < argc)
2112                 if (add_devpath(argv[optind++]) != 0)
2113                         return 1;
2114
2115         if (net_mode != Net_server && ndevs == 0) {
2116                 show_usage(argv[0]);
2117                 return 1;
2118         }
2119
2120         if (statfs(debugfs_path, &st) < 0 || st.f_type != (long)DEBUGFS_TYPE) {
2121                 fprintf(stderr, "Invalid debug path %s: %d/%s\n",
2122                         debugfs_path, errno, strerror(errno));
2123                 return 1;
2124         }
2125
2126         if (act_mask_tmp != 0)
2127                 act_mask = act_mask_tmp;
2128
2129         if (net_mode == Net_client && net_setup_addr())
2130                 return 1;
2131
2132         /*
2133          * Set up for appropriate PFD handler based upon output name.
2134          */
2135         if (net_client_use_sendfile())
2136                 handle_pfds = handle_pfds_netclient;
2137         else if (net_client_use_send())
2138                 handle_pfds = handle_pfds_entries;
2139         else if (output_name && (strcmp(output_name, "-") == 0)) {
2140                 piped_output = 1;
2141                 handle_pfds = handle_pfds_entries;
2142                 pfp = stdout;
2143                 setvbuf(pfp, NULL, _IONBF, 0);
2144         } else
2145                 handle_pfds = handle_pfds_file;
2146         return 0;
2147 }
2148
2149 static void ch_add_connection(struct net_server_s *ns, struct cl_host *ch,
2150                               int fd)
2151 {
2152         struct cl_conn *nc;
2153
2154         nc = malloc(sizeof(*nc));
2155         memset(nc, 0, sizeof(*nc));
2156
2157         time(&nc->connect_time);
2158         nc->ch = ch;
2159         nc->fd = fd;
2160         nc->ncpus = -1;
2161
2162         list_add_tail(&nc->ch_head, &ch->conn_list);
2163         ch->connects++;
2164
2165         list_add_tail(&nc->ns_head, &ns->conn_list);
2166         ns->connects++;
2167         ns->pfds = realloc(ns->pfds, (ns->connects+1) * sizeof(struct pollfd));
2168 }
2169
2170 static void ch_rem_connection(struct net_server_s *ns, struct cl_host *ch,
2171                               struct cl_conn *nc)
2172 {
2173         net_close_connection(&nc->fd);
2174
2175         list_del(&nc->ch_head);
2176         ch->connects--;
2177
2178         list_del(&nc->ns_head);
2179         ns->connects--;
2180         ns->pfds = realloc(ns->pfds, (ns->connects+1) * sizeof(struct pollfd));
2181
2182         free(nc);
2183 }
2184
2185 static struct cl_host *net_find_client_host(struct net_server_s *ns,
2186                                             struct in_addr cl_in_addr)
2187 {
2188         struct list_head *p;
2189
2190         __list_for_each(p, &ns->ch_list) {
2191                 struct cl_host *ch = list_entry(p, struct cl_host, head);
2192
2193                 if (in_addr_eq(ch->cl_in_addr, cl_in_addr))
2194                         return ch;
2195         }
2196
2197         return NULL;
2198 }
2199
2200 static struct cl_host *net_add_client_host(struct net_server_s *ns,
2201                                            struct sockaddr_in *addr)
2202 {
2203         struct cl_host *ch;
2204
2205         ch = malloc(sizeof(*ch));
2206         memset(ch, 0, sizeof(*ch));
2207
2208         ch->ns = ns;
2209         ch->cl_in_addr = addr->sin_addr;
2210         list_add_tail(&ch->head, &ns->ch_list);
2211         ns->nchs++;
2212
2213         ch->hostname = strdup(inet_ntoa(addr->sin_addr));
2214         printf("server: connection from %s\n", ch->hostname);
2215
2216         INIT_LIST_HEAD(&ch->conn_list);
2217         INIT_LIST_HEAD(&ch->devpaths);
2218
2219         return ch;
2220 }
2221
2222 static void device_done(struct devpath *dpp, int ncpus)
2223 {
2224         int cpu;
2225         struct io_info *iop;
2226
2227         for (cpu = 0, iop = dpp->ios; cpu < ncpus; cpu++, iop++)
2228                 close_iop(iop);
2229
2230         list_del(&dpp->head);
2231         dpp_free(dpp);
2232 }
2233
2234 static void net_ch_remove(struct cl_host *ch, int ncpus)
2235 {
2236         struct list_head *p, *q;
2237         struct net_server_s *ns = ch->ns;
2238
2239         list_for_each_safe(p, q, &ch->devpaths) {
2240                 struct devpath *dpp = list_entry(p, struct devpath, head);
2241                 device_done(dpp, ncpus);
2242         }
2243
2244         list_for_each_safe(p, q, &ch->conn_list) {
2245                 struct cl_conn *nc = list_entry(p, struct cl_conn, ch_head);
2246
2247                 ch_rem_connection(ns, ch, nc);
2248         }
2249
2250         list_del(&ch->head);
2251         ns->nchs--;
2252
2253         if (ch->hostname)
2254                 free(ch->hostname);
2255         free(ch);
2256 }
2257
2258 static void net_add_connection(struct net_server_s *ns)
2259 {
2260         int fd;
2261         struct cl_host *ch;
2262         socklen_t socklen = sizeof(ns->addr);
2263
2264         fd = my_accept(ns->listen_fd, (struct sockaddr *)&ns->addr, &socklen);
2265         if (fd < 0) {
2266                 /*
2267                  * This is OK: we just won't accept this connection,
2268                  * nothing fatal.
2269                  */
2270                 perror("accept");
2271         } else {
2272                 ch = net_find_client_host(ns, ns->addr.sin_addr);
2273                 if (!ch)
2274                         ch = net_add_client_host(ns, &ns->addr);
2275
2276                 ch_add_connection(ns, ch, fd);
2277         }
2278 }
2279
2280 static struct devpath *nc_add_dpp(struct cl_conn *nc,
2281                                   struct blktrace_net_hdr *bnh,
2282                                   time_t connect_time)
2283 {
2284         int cpu;
2285         struct io_info *iop;
2286         struct devpath *dpp;
2287
2288         dpp = malloc(sizeof(*dpp));
2289         memset(dpp, 0, sizeof(*dpp));
2290
2291         dpp->buts_name = strdup(bnh->buts_name);
2292         dpp->path = strdup(bnh->buts_name);
2293         dpp->fd = -1;
2294         dpp->ch = nc->ch;
2295         dpp->cl_id = bnh->cl_id;
2296         dpp->cl_connect_time = connect_time;
2297         dpp->ncpus = nc->ncpus;
2298         dpp->stats = calloc(dpp->ncpus, sizeof(*dpp->stats));
2299         memset(dpp->stats, 0, dpp->ncpus * sizeof(*dpp->stats));
2300
2301         list_add_tail(&dpp->head, &nc->ch->devpaths);
2302         nc->ch->ndevs++;
2303
2304         dpp->ios = calloc(nc->ncpus, sizeof(*iop));
2305         memset(dpp->ios, 0, ndevs * sizeof(*iop));
2306
2307         for (cpu = 0, iop = dpp->ios; cpu < nc->ncpus; cpu++, iop++) {
2308                 iop->dpp = dpp;
2309                 iop->nc = nc;
2310                 init_mmap_info(&iop->mmap_info);
2311
2312                 if (iop_open(iop, cpu))
2313                         goto err;
2314         }
2315
2316         return dpp;
2317
2318 err:
2319         /*
2320          * Need to unravel what's been done...
2321          */
2322         while (cpu >= 0)
2323                 close_iop(&dpp->ios[cpu--]);
2324         dpp_free(dpp);
2325
2326         return NULL;
2327 }
2328
2329 static struct devpath *nc_find_dpp(struct cl_conn *nc,
2330                                    struct blktrace_net_hdr *bnh)
2331 {
2332         struct list_head *p;
2333         time_t connect_time = nc->connect_time;
2334
2335         __list_for_each(p, &nc->ch->devpaths) {
2336                 struct devpath *dpp = list_entry(p, struct devpath, head);
2337
2338                 if (!strcmp(dpp->buts_name, bnh->buts_name))
2339                         return dpp;
2340
2341                 if (dpp->cl_id == bnh->cl_id)
2342                         connect_time = dpp->cl_connect_time;
2343         }
2344
2345         return nc_add_dpp(nc, bnh, connect_time);
2346 }
2347
2348 static void net_client_read_data(struct cl_conn *nc, struct devpath *dpp,
2349                                  struct blktrace_net_hdr *bnh)
2350 {
2351         int ret;
2352         struct io_info *iop = &dpp->ios[bnh->cpu];
2353         struct mmap_info *mip = &iop->mmap_info;
2354
2355         if (setup_mmap(iop->ofd, bnh->len, &iop->mmap_info)) {
2356                 fprintf(stderr, "ncd(%s:%d): mmap failed\n",
2357                         nc->ch->hostname, nc->fd);
2358                 exit(1);
2359         }
2360
2361         ret = net_recv_data(nc->fd, mip->fs_buf + mip->fs_off, bnh->len);
2362         if (ret > 0) {
2363                 pdc_dr_update(dpp, bnh->cpu, ret);
2364                 mip->fs_size += ret;
2365                 mip->fs_off += ret;
2366         } else if (ret < 0)
2367                 exit(1);
2368 }
2369
2370 /*
2371  * Returns 1 if we closed a host - invalidates other polling information
2372  * that may be present.
2373  */
2374 static int net_client_data(struct cl_conn *nc)
2375 {
2376         int ret;
2377         struct devpath *dpp;
2378         struct blktrace_net_hdr bnh;
2379
2380         ret = net_get_header(nc, &bnh);
2381         if (ret == 0)
2382                 return 0;
2383
2384         if (ret < 0) {
2385                 fprintf(stderr, "ncd(%d): header read failed\n", nc->fd);
2386                 exit(1);
2387         }
2388
2389         if (data_is_native == -1 && check_data_endianness(bnh.magic)) {
2390                 fprintf(stderr, "ncd(%d): received data is bad\n", nc->fd);
2391                 exit(1);
2392         }
2393
2394         if (!data_is_native) {
2395                 bnh.magic = be32_to_cpu(bnh.magic);
2396                 bnh.cpu = be32_to_cpu(bnh.cpu);
2397                 bnh.max_cpus = be32_to_cpu(bnh.max_cpus);
2398                 bnh.len = be32_to_cpu(bnh.len);
2399                 bnh.cl_id = be32_to_cpu(bnh.cl_id);
2400                 bnh.buf_size = be32_to_cpu(bnh.buf_size);
2401                 bnh.buf_nr = be32_to_cpu(bnh.buf_nr);
2402                 bnh.page_size = be32_to_cpu(bnh.page_size);
2403         }
2404
2405         if ((bnh.magic & 0xffffff00) != BLK_IO_TRACE_MAGIC) {
2406                 fprintf(stderr, "ncd(%s:%d): bad data magic\n",
2407                         nc->ch->hostname, nc->fd);
2408                 exit(1);
2409         }
2410
2411         if (nc->ncpus == -1)
2412                 nc->ncpus = bnh.max_cpus;
2413
2414         /*
2415          * len == 0 means the other end is sending us a new connection/dpp
2416          * len == 1 means that the other end signalled end-of-run
2417          */
2418         dpp = nc_find_dpp(nc, &bnh);
2419         if (bnh.len == 0) {
2420                 /*
2421                  * Just adding in the dpp above is enough
2422                  */
2423                 ack_open_close(nc->fd, dpp->buts_name);
2424                 nc->ch->cl_opens++;
2425         } else if (bnh.len == 1) {
2426                 /*
2427                  * overload cpu count with dropped events
2428                  */
2429                 dpp->drops = bnh.cpu;
2430
2431                 ack_open_close(nc->fd, dpp->buts_name);
2432                 if (--nc->ch->cl_opens == 0) {
2433                         show_stats(&nc->ch->devpaths);
2434                         net_ch_remove(nc->ch, nc->ncpus);
2435                         return 1;
2436                 }
2437         } else
2438                 net_client_read_data(nc, dpp, &bnh);
2439
2440         return 0;
2441 }
2442
2443 static void handle_client_data(struct net_server_s *ns, int events)
2444 {
2445         struct cl_conn *nc;
2446         struct pollfd *pfd;
2447         struct list_head *p, *q;
2448
2449         pfd = &ns->pfds[1];
2450         list_for_each_safe(p, q, &ns->conn_list) {
2451                 if (pfd->revents & POLLIN) {
2452                         nc = list_entry(p, struct cl_conn, ns_head);
2453
2454                         if (net_client_data(nc) || --events == 0)
2455                                 break;
2456                 }
2457                 pfd++;
2458         }
2459 }
2460
2461 static void net_setup_pfds(struct net_server_s *ns)
2462 {
2463         struct pollfd *pfd;
2464         struct list_head *p;
2465
2466         ns->pfds[0].fd = ns->listen_fd;
2467         ns->pfds[0].events = POLLIN;
2468
2469         pfd = &ns->pfds[1];
2470         __list_for_each(p, &ns->conn_list) {
2471                 struct cl_conn *nc = list_entry(p, struct cl_conn, ns_head);
2472
2473                 pfd->fd = nc->fd;
2474                 pfd->events = POLLIN;
2475                 pfd++;
2476         }
2477 }
2478
2479 static int net_server_handle_connections(struct net_server_s *ns)
2480 {
2481         int events;
2482
2483         printf("server: waiting for connections...\n");
2484
2485         while (!done) {
2486                 net_setup_pfds(ns);
2487                 events = poll(ns->pfds, ns->connects + 1, -1);
2488                 if (events < 0) {
2489                         if (errno != EINTR) {
2490                                 perror("FATAL: poll error");
2491                                 return 1;
2492                         }
2493                 } else if (events > 0) {
2494                         if (ns->pfds[0].revents & POLLIN) {
2495                                 net_add_connection(ns);
2496                                 events--;
2497                         }
2498
2499                         if (events)
2500                                 handle_client_data(ns, events);
2501                 }
2502         }
2503
2504         return 0;
2505 }
2506
2507 static int net_server(void)
2508 {
2509         int fd, opt;
2510         int ret = 1;
2511         struct net_server_s net_server;
2512         struct net_server_s *ns = &net_server;
2513
2514         memset(ns, 0, sizeof(*ns));
2515         INIT_LIST_HEAD(&ns->ch_list);
2516         INIT_LIST_HEAD(&ns->conn_list);
2517         ns->pfds = malloc(sizeof(struct pollfd));
2518
2519         fd = my_socket(AF_INET, SOCK_STREAM, 0);
2520         if (fd < 0) {
2521                 perror("server: socket");
2522                 goto out;
2523         }
2524
2525         opt = 1;
2526         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
2527                 perror("setsockopt");
2528                 goto out;
2529         }
2530
2531         memset(&ns->addr, 0, sizeof(ns->addr));
2532         ns->addr.sin_family = AF_INET;
2533         ns->addr.sin_addr.s_addr = htonl(INADDR_ANY);
2534         ns->addr.sin_port = htons(net_port);
2535
2536         if (bind(fd, (struct sockaddr *) &ns->addr, sizeof(ns->addr)) < 0) {
2537                 perror("bind");
2538                 goto out;
2539         }
2540
2541         if (listen(fd, 1) < 0) {
2542                 perror("listen");
2543                 goto out;
2544         }
2545
2546         /*
2547          * The actual server looping is done here:
2548          */
2549         ns->listen_fd = fd;
2550         ret = net_server_handle_connections(ns);
2551
2552         /*
2553          * Clean up and return...
2554          */
2555 out:
2556         free(ns->pfds);
2557         return ret;
2558 }
2559
2560 static int run_tracers(void)
2561 {
2562         atexit(exit_tracing);
2563         if (net_mode == Net_client)
2564                 printf("blktrace: connecting to %s\n", hostname);
2565
2566         setup_buts();
2567
2568         if (use_tracer_devpaths()) {
2569                 if (setup_tracer_devpaths())
2570                         return 1;
2571
2572                 if (piped_output)
2573                         handle_list = handle_list_file;
2574                 else
2575                         handle_list = handle_list_net;
2576         }
2577
2578         start_tracers();
2579         if (nthreads_running == ncpus) {
2580                 unblock_tracers();
2581                 start_buts();
2582                 if (net_mode == Net_client)
2583                         printf("blktrace: connected!\n");
2584                 if (stop_watch)
2585                         alarm(stop_watch);
2586         } else
2587                 stop_tracers();
2588
2589         wait_tracers();
2590         if (nthreads_running == ncpus)
2591                 show_stats(&devpaths);
2592         if (net_client_use_send())
2593                 close_client_connections();
2594         del_tracers();
2595
2596         return 0;
2597 }
2598
2599 int main(int argc, char *argv[])
2600 {
2601         int ret = 0;
2602
2603         setlocale(LC_NUMERIC, "en_US");
2604         pagesize = getpagesize();
2605         ncpus = sysconf(_SC_NPROCESSORS_ONLN);
2606         if (ncpus < 0) {
2607                 fprintf(stderr, "sysconf(_SC_NPROCESSORS_ONLN) failed %d/%s\n",
2608                         errno, strerror(errno));
2609                 ret = 1;
2610                 goto out;
2611         } else if (handle_args(argc, argv)) {
2612                 ret = 1;
2613                 goto out;
2614         }
2615
2616         signal(SIGINT, handle_sigint);
2617         signal(SIGHUP, handle_sigint);
2618         signal(SIGTERM, handle_sigint);
2619         signal(SIGALRM, handle_sigint);
2620         signal(SIGPIPE, SIG_IGN);
2621
2622         if (kill_running_trace) {
2623                 struct devpath *dpp;
2624                 struct list_head *p;
2625
2626                 __list_for_each(p, &devpaths) {
2627                         dpp = list_entry(p, struct devpath, head);
2628                         if (__stop_trace(dpp->fd)) {
2629                                 fprintf(stderr,
2630                                         "BLKTRACETEARDOWN %s failed: %d/%s\n",
2631                                         dpp->path, errno, strerror(errno));
2632                         }
2633                 }
2634         } else if (net_mode == Net_server) {
2635                 if (output_name) {
2636                         fprintf(stderr, "-o ignored in server mode\n");
2637                         output_name = NULL;
2638                 }
2639                 ret = net_server();
2640         } else
2641                 ret = run_tracers();
2642
2643 out:
2644         if (pfp)
2645                 fclose(pfp);
2646         rel_devpaths();
2647         return ret;
2648 }