Code review updates
[blktrace.git] / blktrace.c
1 /*
2  * block queue tracing application
3  *
4  * Copyright (C) 2005 Jens Axboe <axboe@suse.de>
5  * Copyright (C) 2006 Jens Axboe <axboe@kernel.dk>
6  *
7  * Rewrite to have a single thread per CPU (managing all devices on that CPU)
8  *      Alan D. Brunelle <alan.brunelle@hp.com> - January 2009
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *
20  *  You should have received a copy of the GNU General Public License
21  *  along with this program; if not, write to the Free Software
22  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23  *
24  */
25
26 #include <errno.h>
27 #include <stdarg.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <fcntl.h>
32 #include <getopt.h>
33 #include <sched.h>
34 #include <unistd.h>
35 #include <poll.h>
36 #include <signal.h>
37 #include <pthread.h>
38 #include <locale.h>
39 #include <sys/ioctl.h>
40 #include <sys/types.h>
41 #include <sys/stat.h>
42 #include <sys/vfs.h>
43 #include <sys/mman.h>
44 #include <sys/param.h>
45 #include <sys/time.h>
46 #include <sys/resource.h>
47 #include <sys/socket.h>
48 #include <netinet/in.h>
49 #include <arpa/inet.h>
50 #include <netdb.h>
51 #include <sys/sendfile.h>
52
53 #include "btt/list.h"
54 #include "blktrace.h"
55
56 /*
57  * You may want to increase this even more, if you are logging at a high
58  * rate and see skipped/missed events
59  */
60 #define BUF_SIZE                (512 * 1024)
61 #define BUF_NR                  (4)
62
63 #define FILE_VBUF_SIZE          (128 * 1024)
64
65 #define DEBUGFS_TYPE            (0x64626720)
66 #define TRACE_NET_PORT          (8462)
67
68 enum {
69         Net_none = 0,
70         Net_server,
71         Net_client,
72 };
73
74 enum thread_status {
75         Th_running,
76         Th_leaving,
77         Th_error
78 };
79
80 /*
81  * Generic stats collected: nevents can be _roughly_ estimated by data_read
82  * (discounting pdu...)
83  *
84  * These fields are updated w/ pdc_dr_update & pdc_nev_update below.
85  */
86 struct pdc_stats {
87         unsigned long long data_read;
88         unsigned long long nevents;
89 };
90
91 struct devpath {
92         struct list_head head;
93         char *path;                     /* path to device special file */
94         char *buts_name;                /* name returned from bt kernel code */
95         struct pdc_stats *stats;
96         int fd, idx, ncpus;
97         unsigned long long drops;
98
99         /*
100          * For piped output only:
101          *
102          * Each tracer will have a tracer_devpath_head that it will add new
103          * data onto. It's list is protected above (tracer_devpath_head.mutex)
104          * and it will signal the processing thread using the dp_cond,
105          * dp_mutex & dp_entries variables above.
106          */
107         struct tracer_devpath_head *heads;
108
109         /*
110          * For network server mode only:
111          */
112         struct cl_host *ch;
113         u32 cl_id;
114         time_t cl_connect_time;
115         struct io_info *ios;
116 };
117
118 /*
119  * For piped output to stdout we will have each tracer thread (one per dev)
120  * tack buffers read from the relay queues on a per-device list.
121  *
122  * The main thread will then collect trace buffers from each of lists in turn.
123  *
124  * We will use a mutex to guard each of the trace_buf list. The tracers
125  * can then signal the main thread using <dp_cond,dp_mutex> and
126  * dp_entries. (When dp_entries is 0, and a tracer adds an entry it will
127  * signal. When dp_entries is 0, the main thread will wait for that condition
128  * to be signalled.)
129  *
130  * adb: It may be better just to have a large buffer per tracer per dev,
131  * and then use it as a ring-buffer. This would certainly cut down a lot
132  * of malloc/free thrashing, at the cost of more memory movements (potentially).
133  */
134 struct trace_buf {
135         struct list_head head;
136         struct devpath *dpp;
137         void *buf;
138         int cpu, len;
139 };
140
141 struct tracer_devpath_head {
142         pthread_mutex_t mutex;
143         struct list_head head;
144         struct trace_buf *prev;
145 };
146
147 /*
148  * Used to handle the mmap() interfaces for output file (containing traces)
149  */
150 struct mmap_info {
151         void *fs_buf;
152         unsigned long long fs_size, fs_max_size, fs_off, fs_buf_len;
153         unsigned long buf_size, buf_nr;
154         int pagesize;
155 };
156
157 /*
158  * Each thread doing work on a (client) side of blktrace will have one
159  * of these. The ios array contains input/output information, pfds holds
160  * poll() data. The volatile's provide flags to/from the main executing
161  * thread.
162  */
163 struct tracer {
164         struct list_head head;
165         struct io_info *ios;
166         struct pollfd *pfds;
167         pthread_t thread;
168         int cpu, nios;
169         volatile int status, is_done;
170 };
171
172 /*
173  * networking stuff follows. we include a magic number so we know whether
174  * to endianness convert or not.
175  *
176  * The len field is overloaded:
177  *      0 - Indicates an "open" - allowing the server to set up for a dev/cpu
178  *      1 - Indicates a "close" - Shut down connection orderly
179  *
180  * The cpu field is overloaded on close: it will contain the number of drops.
181  */
182 struct blktrace_net_hdr {
183         u32 magic;              /* same as trace magic */
184         char buts_name[32];     /* trace name */
185         u32 cpu;                /* for which cpu */
186         u32 max_cpus;
187         u32 len;                /* length of following trace data */
188         u32 cl_id;              /* id for set of client per-cpu connections */
189         u32 buf_size;           /* client buf_size for this trace  */
190         u32 buf_nr;             /* client buf_nr for this trace  */
191         u32 page_size;          /* client page_size for this trace  */
192 };
193
194 /*
195  * Each host encountered has one of these. The head is used to link this
196  * on to the network server's ch_list. Connections associated with this
197  * host are linked on conn_list, and any devices traced on that host
198  * are connected on the devpaths list.
199  */
200 struct cl_host {
201         struct list_head head;
202         struct list_head conn_list;
203         struct list_head devpaths;
204         struct net_server_s *ns;
205         char *hostname;
206         struct in_addr cl_in_addr;
207         int connects, ndevs, cl_opens;
208 };
209
210 /*
211  * Each connection (client to server socket ('fd')) has one of these. A
212  * back reference to the host ('ch'), and lists headers (for the host
213  * list, and the network server conn_list) are also included.
214  */
215 struct cl_conn {
216         struct list_head ch_head, ns_head;
217         struct cl_host *ch;
218         int fd, ncpus;
219         time_t connect_time;
220 };
221
222 /*
223  * The network server requires some poll structures to be maintained -
224  * one per conection currently on conn_list. The nchs/ch_list values
225  * are for each host connected to this server. The addr field is used
226  * for scratch as new connections are established.
227  */
228 struct net_server_s {
229         struct list_head conn_list;
230         struct list_head ch_list;
231         struct pollfd *pfds;
232         int listen_fd, connects, nchs;
233         struct sockaddr_in addr;
234 };
235
236 /*
237  * This structure is (generically) used to providide information
238  * for a read-to-write set of values.
239  *
240  * ifn & ifd represent input information
241  *
242  * ofn, ofd, ofp, obuf & mmap_info are used for output file (optionally).
243  */
244 struct io_info {
245         struct devpath *dpp;
246         FILE *ofp;
247         char *obuf;
248         struct cl_conn *nc;     /* Server network connection */
249
250         /*
251          * mmap controlled output files
252          */
253         struct mmap_info mmap_info;
254
255         /*
256          * Client network fields
257          */
258         unsigned int ready;
259         unsigned long long data_queued;
260
261         /*
262          * Input/output file descriptors & names
263          */
264         int ifd, ofd;
265         char ifn[MAXPATHLEN + 64];
266         char ofn[MAXPATHLEN + 64];
267 };
268
269 static char blktrace_version[] = "2.0.0";
270
271 /*
272  * Linkage to blktrace helper routines (trace conversions)
273  */
274 int data_is_native = -1;
275
276 static int ndevs;
277 static int ncpus;
278 static int pagesize;
279 static int act_mask = ~0U;
280 static int kill_running_trace;
281 static int stop_watch;
282 static int piped_output;
283
284 static char *debugfs_path = "/sys/kernel/debug";
285 static char *output_name;
286 static char *output_dir;
287
288 static unsigned long buf_size = BUF_SIZE;
289 static unsigned long buf_nr = BUF_NR;
290
291 static FILE *pfp;
292
293 static LIST_HEAD(devpaths);
294 static LIST_HEAD(tracers);
295
296 static volatile int done;
297
298 /*
299  * tracer threads add entries, the main thread takes them off and processes
300  * them. These protect the dp_entries variable.
301  */
302 static pthread_cond_t dp_cond = PTHREAD_COND_INITIALIZER;
303 static pthread_mutex_t dp_mutex = PTHREAD_MUTEX_INITIALIZER;
304 static volatile int dp_entries;
305
306 /*
307  * These synchronize master / thread interactions.
308  */
309 static pthread_cond_t mt_cond = PTHREAD_COND_INITIALIZER;
310 static pthread_mutex_t mt_mutex = PTHREAD_MUTEX_INITIALIZER;
311 static volatile int nthreads_running;
312 static volatile int nthreads_leaving;
313 static volatile int nthreads_error;
314 static volatile int tracers_run;
315
316 /*
317  * network cmd line params
318  */
319 static struct sockaddr_in hostname_addr;
320 static char hostname[MAXHOSTNAMELEN];
321 static int net_port = TRACE_NET_PORT;
322 static int net_use_sendfile = 1;
323 static int net_mode;
324 static int *cl_fds;
325
326 static int (*handle_pfds)(struct tracer *, int, int);
327 static int (*handle_list)(struct tracer_devpath_head *, struct list_head *);
328
329 #define S_OPTS  "d:a:A:r:o:kw:vVb:n:D:lh:p:sI:"
330 static struct option l_opts[] = {
331         {
332                 .name = "dev",
333                 .has_arg = required_argument,
334                 .flag = NULL,
335                 .val = 'd'
336         },
337         {
338                 .name = "input-devs",
339                 .has_arg = required_argument,
340                 .flag = NULL,
341                 .val = 'I'
342         },
343         {
344                 .name = "act-mask",
345                 .has_arg = required_argument,
346                 .flag = NULL,
347                 .val = 'a'
348         },
349         {
350                 .name = "set-mask",
351                 .has_arg = required_argument,
352                 .flag = NULL,
353                 .val = 'A'
354         },
355         {
356                 .name = "relay",
357                 .has_arg = required_argument,
358                 .flag = NULL,
359                 .val = 'r'
360         },
361         {
362                 .name = "output",
363                 .has_arg = required_argument,
364                 .flag = NULL,
365                 .val = 'o'
366         },
367         {
368                 .name = "kill",
369                 .has_arg = no_argument,
370                 .flag = NULL,
371                 .val = 'k'
372         },
373         {
374                 .name = "stopwatch",
375                 .has_arg = required_argument,
376                 .flag = NULL,
377                 .val = 'w'
378         },
379         {
380                 .name = "version",
381                 .has_arg = no_argument,
382                 .flag = NULL,
383                 .val = 'v'
384         },
385         {
386                 .name = "version",
387                 .has_arg = no_argument,
388                 .flag = NULL,
389                 .val = 'V'
390         },
391         {
392                 .name = "buffer-size",
393                 .has_arg = required_argument,
394                 .flag = NULL,
395                 .val = 'b'
396         },
397         {
398                 .name = "num-sub-buffers",
399                 .has_arg = required_argument,
400                 .flag = NULL,
401                 .val = 'n'
402         },
403         {
404                 .name = "output-dir",
405                 .has_arg = required_argument,
406                 .flag = NULL,
407                 .val = 'D'
408         },
409         {
410                 .name = "listen",
411                 .has_arg = no_argument,
412                 .flag = NULL,
413                 .val = 'l'
414         },
415         {
416                 .name = "host",
417                 .has_arg = required_argument,
418                 .flag = NULL,
419                 .val = 'h'
420         },
421         {
422                 .name = "port",
423                 .has_arg = required_argument,
424                 .flag = NULL,
425                 .val = 'p'
426         },
427         {
428                 .name = "no-sendfile",
429                 .has_arg = no_argument,
430                 .flag = NULL,
431                 .val = 's'
432         },
433         {
434                 .name = NULL,
435         }
436 };
437
438 static char usage_str[] = \
439         "-d <dev> [ -r debugfs path ] [ -o <output> ] [-k ] [ -w time ]\n" \
440         "[ -a action ] [ -A action mask ] [ -I  <devs file> ] [ -v ]\n\n" \
441         "\t-d Use specified device. May also be given last after options\n" \
442         "\t-r Path to mounted debugfs, defaults to /sys/kernel/debug\n" \
443         "\t-o File(s) to send output to\n" \
444         "\t-D Directory to prepend to output file names\n" \
445         "\t-k Kill a running trace\n" \
446         "\t-w Stop after defined time, in seconds\n" \
447         "\t-a Only trace specified actions. See documentation\n" \
448         "\t-A Give trace mask as a single value. See documentation\n" \
449         "\t-b Sub buffer size in KiB\n" \
450         "\t-n Number of sub buffers\n" \
451         "\t-l Run in network listen mode (blktrace server)\n" \
452         "\t-h Run in network client mode, connecting to the given host\n" \
453         "\t-p Network port to use (default 8462)\n" \
454         "\t-s Make the network client NOT use sendfile() to transfer data\n" \
455         "\t-I Add devices found in <devs file>\n" \
456         "\t-V Print program version info\n\n";
457
458 static void clear_events(struct pollfd *pfd)
459 {
460         pfd->events = 0;
461         pfd->revents = 0;
462 }
463
464 static inline int net_client_use_sendfile(void)
465 {
466         return net_mode == Net_client && net_use_sendfile;
467 }
468
469 static inline int net_client_use_send(void)
470 {
471         return net_mode == Net_client && !net_use_sendfile;
472 }
473
474 static inline int use_tracer_devpaths(void)
475 {
476         return piped_output || net_client_use_send();
477 }
478
479 static inline int in_addr_eq(struct in_addr a, struct in_addr b)
480 {
481         return a.s_addr == b.s_addr;
482 }
483
484 static inline void pdc_dr_update(struct devpath *dpp, int cpu, int data_read)
485 {
486         dpp->stats[cpu].data_read += data_read;
487 }
488
489 static inline void pdc_nev_update(struct devpath *dpp, int cpu, int nevents)
490 {
491         dpp->stats[cpu].nevents += nevents;
492 }
493
494 static void show_usage(char *prog)
495 {
496         fprintf(stderr, "Usage: %s %s %s", prog, blktrace_version, usage_str);
497 }
498
499 /*
500  * Create a timespec 'msec' milliseconds into the future
501  */
502 static inline void make_timespec(struct timespec *tsp, long delta_msec)
503 {
504         struct timeval now;
505
506         gettimeofday(&now, NULL);
507         tsp->tv_sec = now.tv_sec;
508         tsp->tv_nsec = 1000L * now.tv_usec;
509
510         tsp->tv_nsec += (delta_msec * 1000000L);
511         if (tsp->tv_nsec > 1000000000L) {
512                 long secs = tsp->tv_nsec / 1000000000L;
513
514                 tsp->tv_sec += secs;
515                 tsp->tv_nsec -= (secs * 1000000000L);
516         }
517 }
518
519 /*
520  * Add a timer to ensure wait ends
521  */
522 static void t_pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
523 {
524         struct timespec ts;
525
526         make_timespec(&ts, 50);
527         pthread_cond_timedwait(cond, mutex, &ts);
528 }
529
530 static void unblock_tracers(void)
531 {
532         pthread_mutex_lock(&mt_mutex);
533         tracers_run = 1;
534         pthread_cond_broadcast(&mt_cond);
535         pthread_mutex_unlock(&mt_mutex);
536 }
537
538 static void tracer_wait_unblock(struct tracer *tp)
539 {
540         pthread_mutex_lock(&mt_mutex);
541         while (!tp->is_done && !tracers_run)
542                 pthread_cond_wait(&mt_cond, &mt_mutex);
543         pthread_mutex_unlock(&mt_mutex);
544 }
545
546 static void tracer_signal_ready(struct tracer *tp,
547                                 enum thread_status th_status,
548                                 int status)
549 {
550         pthread_mutex_lock(&mt_mutex);
551         tp->status = status;
552
553         if (th_status == Th_running)
554                 nthreads_running++;
555         else if (th_status == Th_error)
556                 nthreads_error++;
557         else
558                 nthreads_leaving++;
559
560         pthread_cond_signal(&mt_cond);
561         pthread_mutex_unlock(&mt_mutex);
562 }
563
564 static void wait_tracers_ready(int ncpus_started)
565 {
566         pthread_mutex_lock(&mt_mutex);
567         while ((nthreads_running + nthreads_error) < ncpus_started)
568                 t_pthread_cond_wait(&mt_cond, &mt_mutex);
569         pthread_mutex_unlock(&mt_mutex);
570 }
571
572 static void wait_tracers_leaving(void)
573 {
574         pthread_mutex_lock(&mt_mutex);
575         while (nthreads_leaving < nthreads_running)
576                 t_pthread_cond_wait(&mt_cond, &mt_mutex);
577         pthread_mutex_unlock(&mt_mutex);
578 }
579
580 static void init_mmap_info(struct mmap_info *mip)
581 {
582         mip->buf_size = buf_size;
583         mip->buf_nr = buf_nr;
584         mip->pagesize = pagesize;
585 }
586
587 static void net_close_connection(int *fd)
588 {
589         shutdown(*fd, SHUT_RDWR);
590         close(*fd);
591         *fd = -1;
592 }
593
594 static void dpp_free(struct devpath *dpp)
595 {
596         if (dpp->stats)
597                 free(dpp->stats);
598         if (dpp->ios)
599                 free(dpp->ios);
600         if (dpp->path)
601                 free(dpp->path);
602         if (dpp->buts_name)
603                 free(dpp->buts_name);
604         free(dpp);
605 }
606
607 static int lock_on_cpu(int cpu)
608 {
609         cpu_set_t cpu_mask;
610
611         CPU_ZERO(&cpu_mask);
612         CPU_SET(cpu, &cpu_mask);
613         if (sched_setaffinity(getpid(), sizeof(cpu_mask), &cpu_mask) < 0)
614                 return errno;
615
616         return 0;
617 }
618
619 static int increase_limit(int resource, rlim_t increase)
620 {
621         struct rlimit rlim;
622         int save_errno = errno;
623
624         if (!getrlimit(resource, &rlim)) {
625                 rlim.rlim_cur += increase;
626                 if (rlim.rlim_cur >= rlim.rlim_max)
627                         rlim.rlim_max = rlim.rlim_cur + increase;
628
629                 if (!setrlimit(resource, &rlim))
630                         return 1;
631         }
632
633         errno = save_errno;
634         return 0;
635 }
636
637 static int handle_open_failure(void)
638 {
639         if (errno == ENFILE || errno == EMFILE)
640                 return increase_limit(RLIMIT_NOFILE, 16);
641         return 0;
642 }
643
644 static int handle_mem_failure(size_t length)
645 {
646         if (errno == ENFILE)
647                 return handle_open_failure();
648         else if (errno == ENOMEM)
649                 return increase_limit(RLIMIT_MEMLOCK, 2 * length);
650         return 0;
651 }
652
653 static FILE *my_fopen(const char *path, const char *mode)
654 {
655         FILE *fp;
656
657         do {
658                 fp = fopen(path, mode);
659         } while (fp == NULL && handle_open_failure());
660
661         return fp;
662 }
663
664 static int my_open(const char *path, int flags)
665 {
666         int fd;
667
668         do {
669                 fd = open(path, flags);
670         } while (fd < 0 && handle_open_failure());
671
672         return fd;
673 }
674
675 static int my_socket(int domain, int type, int protocol)
676 {
677         int fd;
678
679         do {
680                 fd = socket(domain, type, protocol);
681         } while (fd < 0 && handle_open_failure());
682
683         return fd;
684 }
685
686 static int my_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen)
687 {
688         int fd;
689
690         do {
691                 fd = accept(sockfd, addr, addrlen);
692         } while (fd < 0 && handle_open_failure());
693
694         return fd;
695 }
696
697 static void *my_mmap(void *addr, size_t length, int prot, int flags, int fd,
698                      off_t offset)
699 {
700         void *new;
701
702         do {
703                 new = mmap(addr, length, prot, flags, fd, offset);
704         } while (new == MAP_FAILED && handle_mem_failure(length));
705
706         return new;
707 }
708
709 static int my_mlock(const void *addr, size_t len)
710 {
711         int ret;
712
713         do {
714                 ret = mlock(addr, len);
715         } while (ret < 0 && handle_mem_failure(len));
716
717         return ret;
718 }
719
720 static int setup_mmap(int fd, unsigned int maxlen, struct mmap_info *mip)
721 {
722         if (mip->fs_off + maxlen > mip->fs_buf_len) {
723                 unsigned long nr = max(16, mip->buf_nr);
724
725                 if (mip->fs_buf) {
726                         munlock(mip->fs_buf, mip->fs_buf_len);
727                         munmap(mip->fs_buf, mip->fs_buf_len);
728                         mip->fs_buf = NULL;
729                 }
730
731                 mip->fs_off = mip->fs_size & (mip->pagesize - 1);
732                 mip->fs_buf_len = (nr * mip->buf_size) - mip->fs_off;
733                 mip->fs_max_size += mip->fs_buf_len;
734
735                 if (ftruncate(fd, mip->fs_max_size) < 0) {
736                         perror("setup_mmap: ftruncate");
737                         return 1;
738                 }
739
740                 mip->fs_buf = my_mmap(NULL, mip->fs_buf_len, PROT_WRITE,
741                                       MAP_SHARED, fd,
742                                       mip->fs_size - mip->fs_off);
743                 if (mip->fs_buf == MAP_FAILED) {
744                         perror("setup_mmap: mmap");
745                         return 1;
746                 }
747                 my_mlock(mip->fs_buf, mip->fs_buf_len);
748         }
749
750         return 0;
751 }
752
753 static int __stop_trace(int fd)
754 {
755         /*
756          * Should be stopped, don't complain if it isn't
757          */
758         ioctl(fd, BLKTRACESTOP);
759         return ioctl(fd, BLKTRACETEARDOWN);
760 }
761
762 static int write_data(char *buf, int len)
763 {
764         int ret;
765
766 rewrite:
767         ret = fwrite(buf, len, 1, pfp);
768         if (ferror(pfp) || ret != 1) {
769                 if (errno == EINTR) {
770                         clearerr(pfp);
771                         goto rewrite;
772                 }
773
774                 if (!piped_output || (errno != EPIPE && errno != EBADF)) {
775                         fprintf(stderr, "write(%d) failed: %d/%s\n",
776                                 len, errno, strerror(errno));
777                 }
778                 goto err;
779         }
780
781         fflush(pfp);
782         return 0;
783
784 err:
785         clearerr(pfp);
786         return 1;
787 }
788
789 /*
790  * Returns the number of bytes read (successfully)
791  */
792 static int __net_recv_data(int fd, void *buf, unsigned int len)
793 {
794         unsigned int bytes_left = len;
795
796         while (bytes_left && !done) {
797                 int ret = recv(fd, buf, bytes_left, MSG_WAITALL);
798
799                 if (ret == 0)
800                         break;
801                 else if (ret < 0) {
802                         if (errno != EAGAIN)
803                                 perror("server: net_recv_data: recv failed");
804                         break;
805                 } else {
806                         buf += ret;
807                         bytes_left -= ret;
808                 }
809         }
810
811         return len - bytes_left;
812 }
813
814 static int net_recv_data(int fd, void *buf, unsigned int len)
815 {
816         return __net_recv_data(fd, buf, len);
817 }
818
819 /*
820  * Returns number of bytes written
821  */
822 static int net_send_data(int fd, void *buf, unsigned int buf_len)
823 {
824         int ret;
825         unsigned int bytes_left = buf_len;
826
827         while (bytes_left) {
828                 ret = send(fd, buf, bytes_left, 0);
829                 if (ret < 0) {
830                         perror("send");
831                         break;
832                 }
833
834                 buf += ret;
835                 bytes_left -= ret;
836         }
837
838         return buf_len - bytes_left;
839 }
840
841 static int net_send_header(int fd, int cpu, char *buts_name, int len)
842 {
843         struct blktrace_net_hdr hdr;
844
845         memset(&hdr, 0, sizeof(hdr));
846
847         hdr.magic = BLK_IO_TRACE_MAGIC;
848         strncpy(hdr.buts_name, buts_name, sizeof(hdr.buts_name));
849         hdr.buts_name[sizeof(hdr.buts_name)-1] = '\0';
850         hdr.cpu = cpu;
851         hdr.max_cpus = ncpus;
852         hdr.len = len;
853         hdr.cl_id = getpid();
854         hdr.buf_size = buf_size;
855         hdr.buf_nr = buf_nr;
856         hdr.page_size = pagesize;
857
858         return net_send_data(fd, &hdr, sizeof(hdr)) != sizeof(hdr);
859 }
860
861 static void net_send_open_close(int fd, int cpu, char *buts_name, int len)
862 {
863         struct blktrace_net_hdr ret_hdr;
864
865         net_send_header(fd, cpu, buts_name, len);
866         net_recv_data(fd, &ret_hdr, sizeof(ret_hdr));
867 }
868
869 static void net_send_open(int fd, int cpu, char *buts_name)
870 {
871         net_send_open_close(fd, cpu, buts_name, 0);
872 }
873
874 static void net_send_close(int fd, char *buts_name, int drops)
875 {
876         /*
877          * Overload CPU w/ number of drops
878          *
879          * XXX: Need to clear/set done around call - done=1 (which
880          * is true here) stops reads from happening... :-(
881          */
882         done = 0;
883         net_send_open_close(fd, drops, buts_name, 1);
884         done = 1;
885 }
886
887 static void ack_open_close(int fd, char *buts_name)
888 {
889         net_send_header(fd, 0, buts_name, 2);
890 }
891
892 static void net_send_drops(int fd)
893 {
894         struct list_head *p;
895
896         __list_for_each(p, &devpaths) {
897                 struct devpath *dpp = list_entry(p, struct devpath, head);
898
899                 net_send_close(fd, dpp->buts_name, dpp->drops);
900         }
901 }
902
903 /*
904  * Returns:
905  *       0: "EOF"
906  *       1: OK
907  *      -1: Error
908  */
909 static int net_get_header(struct cl_conn *nc, struct blktrace_net_hdr *bnh)
910 {
911         int bytes_read;
912         int fl = fcntl(nc->fd, F_GETFL);
913
914         fcntl(nc->fd, F_SETFL, fl | O_NONBLOCK);
915         bytes_read = __net_recv_data(nc->fd, bnh, sizeof(*bnh));
916         fcntl(nc->fd, F_SETFL, fl & ~O_NONBLOCK);
917
918         if (bytes_read == sizeof(*bnh))
919                 return 1;
920         else if (bytes_read == 0)
921                 return 0;
922         else
923                 return -1;
924 }
925
926 static int net_setup_addr(void)
927 {
928         struct sockaddr_in *addr = &hostname_addr;
929
930         memset(addr, 0, sizeof(*addr));
931         addr->sin_family = AF_INET;
932         addr->sin_port = htons(net_port);
933
934         if (inet_aton(hostname, &addr->sin_addr) != 1) {
935                 struct hostent *hent;
936 retry:
937                 hent = gethostbyname(hostname);
938                 if (!hent) {
939                         if (h_errno == TRY_AGAIN) {
940                                 usleep(100);
941                                 goto retry;
942                         } else if (h_errno == NO_RECOVERY) {
943                                 fprintf(stderr, "gethostbyname(%s)"
944                                         "non-recoverable error encountered\n",
945                                         hostname);
946                         } else {
947                                 /*
948                                  * HOST_NOT_FOUND, NO_ADDRESS or NO_DATA
949                                  */
950                                 fprintf(stderr, "Host %s not found\n",
951                                         hostname);
952                         }
953                         return 1;
954                 }
955
956                 memcpy(&addr->sin_addr, hent->h_addr, 4);
957                 strcpy(hostname, hent->h_name);
958         }
959
960         return 0;
961 }
962
963 static int net_setup_client(void)
964 {
965         int fd;
966         struct sockaddr_in *addr = &hostname_addr;
967
968         fd = my_socket(AF_INET, SOCK_STREAM, 0);
969         if (fd < 0) {
970                 perror("client: socket");
971                 return -1;
972         }
973
974         if (connect(fd, (struct sockaddr *)addr, sizeof(*addr)) < 0) {
975                 if (errno == ECONNREFUSED)
976                         fprintf(stderr,
977                                 "\nclient: Connection to %s refused, "
978                                 "perhaps the server is not started?\n\n",
979                                 hostname);
980                 else
981                         perror("client: connect");
982
983                 close(fd);
984                 return -1;
985         }
986
987         return fd;
988 }
989
990 static int open_client_connections(void)
991 {
992         int cpu;
993
994         cl_fds = calloc(ncpus, sizeof(*cl_fds));
995         for (cpu = 0; cpu < ncpus; cpu++) {
996                 cl_fds[cpu] = net_setup_client();
997                 if (cl_fds[cpu] < 0)
998                         goto err;
999         }
1000         return 0;
1001
1002 err:
1003         while (cpu > 0)
1004                 close(cl_fds[cpu--]);
1005         free(cl_fds);
1006         return 1;
1007 }
1008
1009 static void close_client_connections(void)
1010 {
1011         if (cl_fds) {
1012                 int cpu, *fdp;
1013
1014                 for (cpu = 0, fdp = cl_fds; cpu < ncpus; cpu++, fdp++) {
1015                         if (*fdp >= 0) {
1016                                 net_send_drops(*fdp);
1017                                 net_close_connection(fdp);
1018                         }
1019                 }
1020                 free(cl_fds);
1021         }
1022 }
1023
1024 static void setup_buts(void)
1025 {
1026         struct list_head *p;
1027
1028         __list_for_each(p, &devpaths) {
1029                 struct blk_user_trace_setup buts;
1030                 struct devpath *dpp = list_entry(p, struct devpath, head);
1031
1032                 memset(&buts, 0, sizeof(buts));
1033                 buts.buf_size = buf_size;
1034                 buts.buf_nr = buf_nr;
1035                 buts.act_mask = act_mask;
1036
1037                 if (ioctl(dpp->fd, BLKTRACESETUP, &buts) >= 0) {
1038                         dpp->ncpus = ncpus;
1039                         dpp->buts_name = strdup(buts.name);
1040                         if (dpp->stats)
1041                                 free(dpp->stats);
1042                         dpp->stats = calloc(dpp->ncpus, sizeof(*dpp->stats));
1043                         memset(dpp->stats, 0, dpp->ncpus * sizeof(*dpp->stats));
1044                 } else
1045                         fprintf(stderr, "BLKTRACESETUP(2) %s failed: %d/%s\n",
1046                                 dpp->path, errno, strerror(errno));
1047         }
1048 }
1049
1050 static void start_buts(void)
1051 {
1052         struct list_head *p;
1053
1054         __list_for_each(p, &devpaths) {
1055                 struct devpath *dpp = list_entry(p, struct devpath, head);
1056
1057                 if (ioctl(dpp->fd, BLKTRACESTART) < 0) {
1058                         fprintf(stderr, "BLKTRACESTART %s failed: %d/%s\n",
1059                                 dpp->path, errno, strerror(errno));
1060                 }
1061         }
1062 }
1063
1064 static int get_drops(struct devpath *dpp)
1065 {
1066         int fd, drops = 0;
1067         char fn[MAXPATHLEN + 64], tmp[256];
1068
1069         snprintf(fn, sizeof(fn), "%s/block/%s/dropped", debugfs_path,
1070                  dpp->buts_name);
1071
1072         fd = my_open(fn, O_RDONLY);
1073         if (fd < 0) {
1074                 /*
1075                  * This may be ok: the kernel may not support
1076                  * dropped counts.
1077                  */
1078                 if (errno != ENOENT)
1079                         fprintf(stderr, "Could not open %s: %d/%s\n",
1080                                 fn, errno, strerror(errno));
1081                 return 0;
1082         } else if (read(fd, tmp, sizeof(tmp)) < 0) {
1083                 fprintf(stderr, "Could not read %s: %d/%s\n",
1084                         fn, errno, strerror(errno));
1085         } else
1086                 drops = atoi(tmp);
1087         close(fd);
1088
1089         return drops;
1090 }
1091
1092 static void get_all_drops(void)
1093 {
1094         struct list_head *p;
1095
1096         __list_for_each(p, &devpaths) {
1097                 struct devpath *dpp = list_entry(p, struct devpath, head);
1098
1099                 dpp->drops = get_drops(dpp);
1100         }
1101 }
1102
1103 static inline struct trace_buf *alloc_trace_buf(int cpu, int bufsize)
1104 {
1105         struct trace_buf *tbp;
1106
1107         tbp = malloc(sizeof(*tbp) + bufsize);
1108         INIT_LIST_HEAD(&tbp->head);
1109         tbp->len = 0;
1110         tbp->buf = (void *)(tbp + 1);
1111         tbp->cpu = cpu;
1112         tbp->dpp = NULL;        /* Will be set when tbp is added */
1113
1114         return tbp;
1115 }
1116
1117 static void free_tracer_heads(struct devpath *dpp)
1118 {
1119         int cpu;
1120         struct tracer_devpath_head *hd;
1121
1122         for (cpu = 0, hd = dpp->heads; cpu < ncpus; cpu++, hd++) {
1123                 if (hd->prev)
1124                         free(hd->prev);
1125
1126                 pthread_mutex_destroy(&hd->mutex);
1127         }
1128         free(dpp->heads);
1129 }
1130
1131 static int setup_tracer_devpaths(void)
1132 {
1133         struct list_head *p;
1134
1135         if (net_client_use_send())
1136                 if (open_client_connections())
1137                         return 1;
1138
1139         __list_for_each(p, &devpaths) {
1140                 int cpu;
1141                 struct tracer_devpath_head *hd;
1142                 struct devpath *dpp = list_entry(p, struct devpath, head);
1143
1144                 dpp->heads = calloc(ncpus, sizeof(struct tracer_devpath_head));
1145                 for (cpu = 0, hd = dpp->heads; cpu < ncpus; cpu++, hd++) {
1146                         INIT_LIST_HEAD(&hd->head);
1147                         pthread_mutex_init(&hd->mutex, NULL);
1148                         hd->prev = NULL;
1149                 }
1150         }
1151
1152         return 0;
1153 }
1154
1155 static inline void add_trace_buf(struct devpath *dpp, int cpu,
1156                                                 struct trace_buf **tbpp)
1157 {
1158         struct trace_buf *tbp = *tbpp;
1159         struct tracer_devpath_head *hd = &dpp->heads[cpu];
1160
1161         tbp->dpp = dpp;
1162
1163         pthread_mutex_lock(&hd->mutex);
1164         list_add_tail(&tbp->head, &hd->head);
1165         pthread_mutex_unlock(&hd->mutex);
1166
1167         *tbpp = alloc_trace_buf(cpu, buf_size);
1168 }
1169
1170 static inline void incr_entries(int entries_handled)
1171 {
1172         pthread_mutex_lock(&dp_mutex);
1173         if (dp_entries == 0)
1174                 pthread_cond_signal(&dp_cond);
1175         dp_entries += entries_handled;
1176         pthread_mutex_unlock(&dp_mutex);
1177 }
1178
1179 static void decr_entries(int handled)
1180 {
1181         pthread_mutex_lock(&dp_mutex);
1182         dp_entries -= handled;
1183         pthread_mutex_unlock(&dp_mutex);
1184 }
1185
1186 static int wait_empty_entries(void)
1187 {
1188         pthread_mutex_lock(&dp_mutex);
1189         while (!done && dp_entries == 0)
1190                 t_pthread_cond_wait(&dp_cond, &dp_mutex);
1191         pthread_mutex_unlock(&dp_mutex);
1192
1193         return !done;
1194 }
1195
1196 static int add_devpath(char *path)
1197 {
1198         int fd;
1199         struct devpath *dpp;
1200
1201         /*
1202          * Verify device is valid before going too far
1203          */
1204         fd = my_open(path, O_RDONLY | O_NONBLOCK);
1205         if (fd < 0) {
1206                 fprintf(stderr, "Invalid path %s specified: %d/%s\n",
1207                         path, errno, strerror(errno));
1208                 return 1;
1209         }
1210
1211         dpp = malloc(sizeof(*dpp));
1212         memset(dpp, 0, sizeof(*dpp));
1213         dpp->path = strdup(path);
1214         dpp->fd = fd;
1215         dpp->idx = ndevs++;
1216         list_add_tail(&dpp->head, &devpaths);
1217
1218         return 0;
1219 }
1220
1221 static void rel_devpaths(void)
1222 {
1223         struct list_head *p, *q;
1224
1225         list_for_each_safe(p, q, &devpaths) {
1226                 struct devpath *dpp = list_entry(p, struct devpath, head);
1227
1228                 list_del(&dpp->head);
1229                 __stop_trace(dpp->fd);
1230                 close(dpp->fd);
1231
1232                 if (dpp->heads)
1233                         free_tracer_heads(dpp);
1234
1235                 dpp_free(dpp);
1236                 ndevs--;
1237         }
1238 }
1239
1240 static int flush_subbuf_net(struct trace_buf *tbp)
1241 {
1242         int fd = cl_fds[tbp->cpu];
1243         struct devpath *dpp = tbp->dpp;
1244
1245         if (net_send_header(fd, tbp->cpu, dpp->buts_name, tbp->len))
1246                 return 1;
1247         else if (net_send_data(fd, tbp->buf, tbp->len) != tbp->len)
1248                 return 1;
1249
1250         return 0;
1251 }
1252
1253 static int
1254 handle_list_net(__attribute__((__unused__))struct tracer_devpath_head *hd,
1255                 struct list_head *list)
1256 {
1257         struct trace_buf *tbp;
1258         struct list_head *p, *q;
1259         int entries_handled = 0;
1260
1261         list_for_each_safe(p, q, list) {
1262                 tbp = list_entry(p, struct trace_buf, head);
1263
1264                 list_del(&tbp->head);
1265                 entries_handled++;
1266
1267                 if (cl_fds[tbp->cpu] >= 0) {
1268                         if (flush_subbuf_net(tbp)) {
1269                                 close(cl_fds[tbp->cpu]);
1270                                 cl_fds[tbp->cpu] = -1;
1271                         }
1272                 }
1273
1274                 free(tbp);
1275         }
1276
1277         return entries_handled;
1278 }
1279
1280 /*
1281  * Tack 'tbp's buf onto the tail of 'prev's buf
1282  */
1283 static struct trace_buf *tb_combine(struct trace_buf *prev,
1284                                     struct trace_buf *tbp)
1285 {
1286         unsigned long tot_len;
1287
1288         tot_len = prev->len + tbp->len;
1289         if (tot_len > buf_size) {
1290                 /*
1291                  * tbp->head isn't connected (it was 'prev'
1292                  * so it had been taken off of the list
1293                  * before). Therefore, we can realloc
1294                  * the whole structures, as the other fields
1295                  * are "static".
1296                  */
1297                 prev = realloc(prev->buf, sizeof(*prev) + tot_len);
1298                 prev->buf = (void *)(prev + 1);
1299         }
1300
1301         memcpy(prev->buf + prev->len, tbp->buf, tbp->len);
1302         prev->len = tot_len;
1303
1304         free(tbp);
1305         return prev;
1306 }
1307
1308 static int handle_list_file(struct tracer_devpath_head *hd,
1309                             struct list_head *list)
1310 {
1311         int off, t_len, nevents;
1312         struct blk_io_trace *t;
1313         struct list_head *p, *q;
1314         int entries_handled = 0;
1315         struct trace_buf *tbp, *prev;
1316
1317         prev = hd->prev;
1318         list_for_each_safe(p, q, list) {
1319                 tbp = list_entry(p, struct trace_buf, head);
1320                 list_del(&tbp->head);
1321                 entries_handled++;
1322
1323                 /*
1324                  * If there was some leftover before, tack this new
1325                  * entry onto the tail of the previous one.
1326                  */
1327                 if (prev)
1328                         tbp = tb_combine(prev, tbp);
1329
1330                 /*
1331                  * See how many whole traces there are - send them
1332                  * all out in one go.
1333                  */
1334                 off = 0;
1335                 nevents = 0;
1336                 while (off + (int)sizeof(*t) <= tbp->len) {
1337                         t = (struct blk_io_trace *)(tbp->buf + off);
1338                         t_len = sizeof(*t) + t->pdu_len;
1339                         if (off + t_len > tbp->len)
1340                                 break;
1341
1342                         off += t_len;
1343                         nevents++;
1344                 }
1345                 if (nevents)
1346                         pdc_nev_update(tbp->dpp, tbp->cpu, nevents);
1347
1348                 /*
1349                  * Write any full set of traces, any remaining data is kept
1350                  * for the next pass.
1351                  */
1352                 if (off) {
1353                         if (write_data(tbp->buf, off) || off == tbp->len) {
1354                                 free(tbp);
1355                                 prev = NULL;
1356                         }
1357                         else {
1358                                 /*
1359                                  * Move valid data to beginning of buffer
1360                                  */
1361                                 tbp->len -= off;
1362                                 memmove(tbp->buf, tbp->buf + off, tbp->len);
1363                                 prev = tbp;
1364                         }
1365                 } else
1366                         prev = tbp;
1367         }
1368         hd->prev = prev;
1369
1370         return entries_handled;
1371 }
1372
1373 static void __process_trace_bufs(void)
1374 {
1375         int cpu;
1376         struct list_head *p;
1377         struct list_head list;
1378         int handled = 0;
1379
1380         __list_for_each(p, &devpaths) {
1381                 struct devpath *dpp = list_entry(p, struct devpath, head);
1382                 struct tracer_devpath_head *hd = dpp->heads;
1383
1384                 for (cpu = 0; cpu < ncpus; cpu++, hd++) {
1385                         pthread_mutex_lock(&hd->mutex);
1386                         if (list_empty(&hd->head)) {
1387                                 pthread_mutex_unlock(&hd->mutex);
1388                                 continue;
1389                         }
1390
1391                         list_replace_init(&hd->head, &list);
1392                         pthread_mutex_unlock(&hd->mutex);
1393
1394                         handled += handle_list(hd, &list);
1395                 }
1396         }
1397
1398         if (handled)
1399                 decr_entries(handled);
1400 }
1401
1402 static void process_trace_bufs(void)
1403 {
1404         while (wait_empty_entries())
1405                 __process_trace_bufs();
1406 }
1407
1408 static void clean_trace_bufs(void)
1409 {
1410         /*
1411          * No mutex needed here: we're only reading from the lists,
1412          * tracers are done
1413          */
1414         while (dp_entries)
1415                 __process_trace_bufs();
1416 }
1417
1418 static inline void read_err(int cpu, char *ifn)
1419 {
1420         if (errno != EAGAIN)
1421                 fprintf(stderr, "Thread %d failed read of %s: %d/%s\n",
1422                         cpu, ifn, errno, strerror(errno));
1423 }
1424
1425 static int net_sendfile(struct io_info *iop)
1426 {
1427         int ret;
1428
1429         ret = sendfile(iop->ofd, iop->ifd, NULL, iop->ready);
1430         if (ret < 0) {
1431                 perror("sendfile");
1432                 return 1;
1433         } else if (ret < (int)iop->ready) {
1434                 fprintf(stderr, "short sendfile send (%d of %d)\n",
1435                         ret, iop->ready);
1436                 return 1;
1437         }
1438
1439         return 0;
1440 }
1441
1442 static inline int net_sendfile_data(struct tracer *tp, struct io_info *iop)
1443 {
1444         struct devpath *dpp = iop->dpp;
1445
1446         if (net_send_header(iop->ofd, tp->cpu, dpp->buts_name, iop->ready))
1447                 return 1;
1448         return net_sendfile(iop);
1449 }
1450
1451 static int fill_ofname(struct io_info *iop, int cpu)
1452 {
1453         int len;
1454         struct stat sb;
1455         char *dst = iop->ofn;
1456
1457         if (output_dir)
1458                 len = snprintf(iop->ofn, sizeof(iop->ofn), "%s/", output_dir);
1459         else
1460                 len = snprintf(iop->ofn, sizeof(iop->ofn), "./");
1461
1462         if (net_mode == Net_server) {
1463                 struct cl_conn *nc = iop->nc;
1464
1465                 len += sprintf(dst + len, "%s-", nc->ch->hostname);
1466                 len += strftime(dst + len, 64, "%F-%T/",
1467                                 gmtime(&iop->dpp->cl_connect_time));
1468         }
1469
1470         if (stat(iop->ofn, &sb) < 0) {
1471                 if (errno != ENOENT) {
1472                         fprintf(stderr,
1473                                 "Destination dir %s stat failed: %d/%s\n",
1474                                 iop->ofn, errno, strerror(errno));
1475                         return 1;
1476                 }
1477                 if (mkdir(iop->ofn, 0755) < 0) {
1478                         fprintf(stderr,
1479                                 "Destination dir %s can't be made: %d/%s\n",
1480                                 iop->ofn, errno, strerror(errno));
1481                         return 1;
1482                 }
1483         }
1484
1485         if (output_name)
1486                 snprintf(iop->ofn + len, sizeof(iop->ofn), "%s.blktrace.%d",
1487                          output_name, cpu);
1488         else
1489                 snprintf(iop->ofn + len, sizeof(iop->ofn), "%s.blktrace.%d",
1490                          iop->dpp->buts_name, cpu);
1491
1492         return 0;
1493 }
1494
1495 static int set_vbuf(struct io_info *iop, int mode, size_t size)
1496 {
1497         iop->obuf = malloc(size);
1498         if (setvbuf(iop->ofp, iop->obuf, mode, size) < 0) {
1499                 fprintf(stderr, "setvbuf(%s, %d) failed: %d/%s\n",
1500                         iop->dpp->path, (int)size, errno,
1501                         strerror(errno));
1502                 free(iop->obuf);
1503                 return 1;
1504         }
1505
1506         return 0;
1507 }
1508
1509 static int iop_open(struct io_info *iop, int cpu)
1510 {
1511         iop->ofd = -1;
1512         if (fill_ofname(iop, cpu))
1513                 return 1;
1514
1515         iop->ofp = my_fopen(iop->ofn, "w+");
1516         if (iop->ofp == NULL) {
1517                 fprintf(stderr, "Open output file %s failed: %d/%s\n",
1518                         iop->ofn, errno, strerror(errno));
1519                 return 1;
1520         }
1521
1522         if (set_vbuf(iop, _IOLBF, FILE_VBUF_SIZE)) {
1523                 fprintf(stderr, "set_vbuf for file %s failed: %d/%s\n",
1524                         iop->ofn, errno, strerror(errno));
1525                 fclose(iop->ofp);
1526                 return 1;
1527         }
1528
1529         iop->ofd = fileno(iop->ofp);
1530         return 0;
1531 }
1532
1533 static void close_iop(struct io_info *iop)
1534 {
1535         struct mmap_info *mip = &iop->mmap_info;
1536
1537         if (mip->fs_buf)
1538                 munmap(mip->fs_buf, mip->fs_buf_len);
1539
1540         if (!piped_output) {
1541                 if (ftruncate(fileno(iop->ofp), mip->fs_size) < 0) {
1542                         fprintf(stderr,
1543                                 "Ignoring err: ftruncate(%s): %d/%s\n",
1544                                 iop->ofn, errno, strerror(errno));
1545                 }
1546         }
1547
1548         if (iop->ofp)
1549                 fclose(iop->ofp);
1550         if (iop->obuf)
1551                 free(iop->obuf);
1552 }
1553
1554 static void close_ios(struct tracer *tp)
1555 {
1556         while (tp->nios > 0) {
1557                 struct io_info *iop = &tp->ios[--tp->nios];
1558
1559                 iop->dpp->drops = get_drops(iop->dpp);
1560                 if (iop->ifd >= 0)
1561                         close(iop->ifd);
1562
1563                 if (iop->ofp)
1564                         close_iop(iop);
1565                 else if (iop->ofd >= 0) {
1566                         struct devpath *dpp = iop->dpp;
1567
1568                         net_send_close(iop->ofd, dpp->buts_name, dpp->drops);
1569                         net_close_connection(&iop->ofd);
1570                 }
1571         }
1572
1573         free(tp->ios);
1574         free(tp->pfds);
1575 }
1576
1577 static int open_ios(struct tracer *tp)
1578 {
1579         struct pollfd *pfd;
1580         struct io_info *iop;
1581         struct list_head *p;
1582
1583         tp->ios = calloc(ndevs, sizeof(struct io_info));
1584         memset(tp->ios, 0, ndevs * sizeof(struct io_info));
1585
1586         tp->pfds = calloc(ndevs, sizeof(struct pollfd));
1587         memset(tp->pfds, 0, ndevs * sizeof(struct pollfd));
1588
1589         tp->nios = 0;
1590         iop = tp->ios;
1591         pfd = tp->pfds;
1592         __list_for_each(p, &devpaths) {
1593                 struct devpath *dpp = list_entry(p, struct devpath, head);
1594
1595                 iop->dpp = dpp;
1596                 iop->ofd = -1;
1597                 snprintf(iop->ifn, sizeof(iop->ifn), "%s/block/%s/trace%d",
1598                         debugfs_path, dpp->buts_name, tp->cpu);
1599
1600                 iop->ifd = my_open(iop->ifn, O_RDONLY | O_NONBLOCK);
1601                 if (iop->ifd < 0) {
1602                         fprintf(stderr, "Thread %d failed open %s: %d/%s\n",
1603                                 tp->cpu, iop->ifn, errno, strerror(errno));
1604                         return 1;
1605                 }
1606
1607                 init_mmap_info(&iop->mmap_info);
1608
1609                 pfd->fd = iop->ifd;
1610                 pfd->events = POLLIN;
1611
1612                 if (piped_output)
1613                         ;
1614                 else if (net_client_use_sendfile()) {
1615                         iop->ofd = net_setup_client();
1616                         if (iop->ofd < 0)
1617                                 goto err;
1618                         net_send_open(iop->ofd, tp->cpu, dpp->buts_name);
1619                 } else if (net_mode == Net_none) {
1620                         if (iop_open(iop, tp->cpu))
1621                                 goto err;
1622                 } else {
1623                         /*
1624                          * This ensures that the server knows about all
1625                          * connections & devices before _any_ closes
1626                          */
1627                         net_send_open(cl_fds[tp->cpu], tp->cpu, dpp->buts_name);
1628                 }
1629
1630                 pfd++;
1631                 iop++;
1632                 tp->nios++;
1633         }
1634
1635         return 0;
1636
1637 err:
1638         close(iop->ifd);        /* tp->nios _not_ bumped */
1639         close_ios(tp);
1640         return 1;
1641 }
1642
1643 static int handle_pfds_file(struct tracer *tp, int nevs, int force_read)
1644 {
1645         struct mmap_info *mip;
1646         int i, ret, nentries = 0;
1647         struct pollfd *pfd = tp->pfds;
1648         struct io_info *iop = tp->ios;
1649
1650         for (i = 0; nevs > 0 && i < ndevs; i++, pfd++, iop++) {
1651                 if (pfd->revents & POLLIN || force_read) {
1652                         mip = &iop->mmap_info;
1653
1654                         ret = setup_mmap(iop->ofd, buf_size, mip);
1655                         if (ret < 0) {
1656                                 pfd->events = 0;
1657                                 break;
1658                         }
1659
1660                         ret = read(iop->ifd, mip->fs_buf + mip->fs_off,
1661                                    buf_size);
1662                         if (ret > 0) {
1663                                 pdc_dr_update(iop->dpp, tp->cpu, ret);
1664                                 mip->fs_size += ret;
1665                                 mip->fs_off += ret;
1666                                 nentries++;
1667                         } else if (ret == 0) {
1668                                 /*
1669                                  * Short reads after we're done stop us
1670                                  * from trying reads.
1671                                  */
1672                                 if (tp->is_done)
1673                                         clear_events(pfd);
1674                         } else {
1675                                 read_err(tp->cpu, iop->ifn);
1676                                 if (errno != EAGAIN || tp->is_done)
1677                                         clear_events(pfd);
1678                         }
1679                         nevs--;
1680                 }
1681         }
1682
1683         return nentries;
1684 }
1685
1686 static int handle_pfds_netclient(struct tracer *tp, int nevs, int force_read)
1687 {
1688         struct stat sb;
1689         int i, nentries = 0;
1690         struct pdc_stats *sp;
1691         struct pollfd *pfd = tp->pfds;
1692         struct io_info *iop = tp->ios;
1693
1694         for (i = 0; i < ndevs; i++, pfd++, iop++, sp++) {
1695                 if (pfd->revents & POLLIN || force_read) {
1696                         if (fstat(iop->ifd, &sb) < 0) {
1697                                 perror(iop->ifn);
1698                                 pfd->events = 0;
1699                         } else if (sb.st_size > (off_t)iop->data_queued) {
1700                                 iop->ready = sb.st_size - iop->data_queued;
1701                                 iop->data_queued = sb.st_size;
1702
1703                                 if (!net_sendfile_data(tp, iop)) {
1704                                         pdc_dr_update(iop->dpp, tp->cpu,
1705                                                       iop->ready);
1706                                         nentries++;
1707                                 } else
1708                                         clear_events(pfd);
1709                         }
1710                         if (--nevs == 0)
1711                                 break;
1712                 }
1713         }
1714
1715         if (nentries)
1716                 incr_entries(nentries);
1717
1718         return nentries;
1719 }
1720
1721 static int handle_pfds_entries(struct tracer *tp, int nevs, int force_read)
1722 {
1723         int i, nentries = 0;
1724         struct trace_buf *tbp;
1725         struct pollfd *pfd = tp->pfds;
1726         struct io_info *iop = tp->ios;
1727
1728         tbp = alloc_trace_buf(tp->cpu, buf_size);
1729         for (i = 0; i < ndevs; i++, pfd++, iop++) {
1730                 if (pfd->revents & POLLIN || force_read) {
1731                         tbp->len = read(iop->ifd, tbp->buf, buf_size);
1732                         if (tbp->len > 0) {
1733                                 pdc_dr_update(iop->dpp, tp->cpu, tbp->len);
1734                                 add_trace_buf(iop->dpp, tp->cpu, &tbp);
1735                                 nentries++;
1736                         } else if (tbp->len == 0) {
1737                                 /*
1738                                  * Short reads after we're done stop us
1739                                  * from trying reads.
1740                                  */
1741                                 if (tp->is_done)
1742                                         clear_events(pfd);
1743                         } else {
1744                                 read_err(tp->cpu, iop->ifn);
1745                                 if (errno != EAGAIN || tp->is_done)
1746                                         clear_events(pfd);
1747                         }
1748                         if (!piped_output && --nevs == 0)
1749                                 break;
1750                 }
1751         }
1752         free(tbp);
1753
1754         if (nentries)
1755                 incr_entries(nentries);
1756
1757         return nentries;
1758 }
1759
1760 static void *thread_main(void *arg)
1761 {
1762         int ret, ndone, to_val;
1763         struct tracer *tp = arg;
1764
1765         ret = lock_on_cpu(tp->cpu);
1766         if (ret)
1767                 goto err;
1768
1769         ret = open_ios(tp);
1770         if (ret)
1771                 goto err;
1772
1773         if (piped_output)
1774                 to_val = 50;            /* Frequent partial handles */
1775         else
1776                 to_val = 500;           /* 1/2 second intervals */
1777
1778
1779         tracer_signal_ready(tp, Th_running, 0);
1780         tracer_wait_unblock(tp);
1781
1782         while (!tp->is_done) {
1783                 ndone = poll(tp->pfds, ndevs, to_val);
1784                 if (ndone || piped_output)
1785                         (void)handle_pfds(tp, ndone, piped_output);
1786                 else if (ndone < 0 && errno != EINTR)
1787                         fprintf(stderr, "Thread %d poll failed: %d/%s\n",
1788                                 tp->cpu, errno, strerror(errno));
1789         }
1790
1791         /*
1792          * Trace is stopped, pull data until we get a short read
1793          */
1794         while (handle_pfds(tp, ndevs, 1) > 0)
1795                 ;
1796
1797         close_ios(tp);
1798         tracer_signal_ready(tp, Th_leaving, 0);
1799         return NULL;
1800
1801 err:
1802         tracer_signal_ready(tp, Th_error, ret);
1803         return NULL;
1804 }
1805
1806 static int start_tracer(int cpu)
1807 {
1808         struct tracer *tp;
1809
1810         tp = malloc(sizeof(*tp));
1811         memset(tp, 0, sizeof(*tp));
1812
1813         INIT_LIST_HEAD(&tp->head);
1814         tp->status = 0;
1815         tp->cpu = cpu;
1816
1817         if (pthread_create(&tp->thread, NULL, thread_main, tp)) {
1818                 fprintf(stderr, "FAILED to start thread on CPU %d: %d/%s\n",
1819                         cpu, errno, strerror(errno));
1820                 free(tp);
1821                 return 1;
1822         }
1823
1824         list_add_tail(&tp->head, &tracers);
1825         return 0;
1826 }
1827
1828 static void start_tracers(void)
1829 {
1830         int cpu;
1831         struct list_head *p;
1832
1833         for (cpu = 0; cpu < ncpus; cpu++)
1834                 if (start_tracer(cpu))
1835                         break;
1836
1837         wait_tracers_ready(cpu);
1838
1839         __list_for_each(p, &tracers) {
1840                 struct tracer *tp = list_entry(p, struct tracer, head);
1841                 if (tp->status)
1842                         fprintf(stderr,
1843                                 "FAILED to start thread on CPU %d: %d/%s\n",
1844                                 tp->cpu, tp->status, strerror(tp->status));
1845         }
1846 }
1847
1848 static void stop_tracers(void)
1849 {
1850         struct list_head *p;
1851
1852         /*
1853          * Stop the tracing - makes the tracer threads clean up quicker.
1854          */
1855         __list_for_each(p, &devpaths) {
1856                 struct devpath *dpp = list_entry(p, struct devpath, head);
1857                 (void)ioctl(dpp->fd, BLKTRACESTOP);
1858         }
1859
1860         /*
1861          * Tell each tracer to quit
1862          */
1863         __list_for_each(p, &tracers) {
1864                 struct tracer *tp = list_entry(p, struct tracer, head);
1865                 tp->is_done = 1;
1866         }
1867 }
1868
1869 static void del_tracers(void)
1870 {
1871         struct list_head *p, *q;
1872
1873         list_for_each_safe(p, q, &tracers) {
1874                 struct tracer *tp = list_entry(p, struct tracer, head);
1875
1876                 list_del(&tp->head);
1877                 free(tp);
1878         }
1879 }
1880
1881 static void wait_tracers(void)
1882 {
1883         struct list_head *p;
1884
1885         if (use_tracer_devpaths())
1886                 process_trace_bufs();
1887
1888         wait_tracers_leaving();
1889
1890         __list_for_each(p, &tracers) {
1891                 int ret;
1892                 struct tracer *tp = list_entry(p, struct tracer, head);
1893
1894                 ret = pthread_join(tp->thread, NULL);
1895                 if (ret)
1896                         fprintf(stderr, "Thread join %d failed %d\n",
1897                                 tp->cpu, ret);
1898         }
1899
1900         if (use_tracer_devpaths())
1901                 clean_trace_bufs();
1902
1903         get_all_drops();
1904 }
1905
1906 static void exit_tracing(void)
1907 {
1908         signal(SIGINT, SIG_IGN);
1909         signal(SIGHUP, SIG_IGN);
1910         signal(SIGTERM, SIG_IGN);
1911         signal(SIGALRM, SIG_IGN);
1912
1913         stop_tracers();
1914         wait_tracers();
1915         del_tracers();
1916         rel_devpaths();
1917 }
1918
1919 static void handle_sigint(__attribute__((__unused__)) int sig)
1920 {
1921         done = 1;
1922         stop_tracers();
1923 }
1924
1925 static void show_stats(struct list_head *devpaths)
1926 {
1927         FILE *ofp;
1928         struct list_head *p;
1929         unsigned long long nevents, data_read;
1930         unsigned long long total_drops = 0;
1931         unsigned long long total_events = 0;
1932
1933         if (piped_output)
1934                 ofp = my_fopen("/dev/null", "w");
1935         else
1936                 ofp = stdout;
1937
1938         __list_for_each(p, devpaths) {
1939                 int cpu;
1940                 struct pdc_stats *sp;
1941                 struct devpath *dpp = list_entry(p, struct devpath, head);
1942
1943                 if (net_mode == Net_server)
1944                         printf("server: end of run for %s:%s\n",
1945                                 dpp->ch->hostname, dpp->buts_name);
1946
1947                 data_read = 0;
1948                 nevents = 0;
1949
1950                 fprintf(ofp, "=== %s ===\n", dpp->buts_name);
1951                 for (cpu = 0, sp = dpp->stats; cpu < dpp->ncpus; cpu++, sp++) {
1952                         /*
1953                          * Estimate events if not known...
1954                          */
1955                         if (sp->nevents == 0) {
1956                                 sp->nevents = sp->data_read /
1957                                                 sizeof(struct blk_io_trace);
1958                         }
1959
1960                         fprintf(ofp,
1961                                 "  CPU%3d: %20llu events, %8llu KiB data\n",
1962                                 cpu, sp->nevents, (sp->data_read + 1023) >> 10);
1963
1964                         data_read += sp->data_read;
1965                         nevents += sp->nevents;
1966                 }
1967
1968                 fprintf(ofp, "  Total:  %20llu events (dropped %llu),"
1969                              " %8llu KiB data\n", nevents,
1970                              dpp->drops, (data_read + 1024) >> 10);
1971
1972                 total_drops += dpp->drops;
1973                 total_events += (nevents + dpp->drops);
1974         }
1975
1976         fflush(ofp);
1977         if (piped_output)
1978                 fclose(ofp);
1979
1980         if (total_drops) {
1981                 double drops_ratio = 1.0;
1982
1983                 if (total_events)
1984                         drops_ratio = (double)total_drops/(double)total_events;
1985
1986                 fprintf(stderr, "\nYou have %llu (%5.1lf%%) dropped events\n"
1987                                 "Consider using a larger buffer size (-b) "
1988                                 "and/or more buffers (-n)\n",
1989                         total_drops, 100.0 * drops_ratio);
1990         }
1991 }
1992
1993 static int handle_args(int argc, char *argv[])
1994 {
1995         int c, i;
1996         struct statfs st;
1997         int act_mask_tmp = 0;
1998
1999         while ((c = getopt_long(argc, argv, S_OPTS, l_opts, NULL)) >= 0) {
2000                 switch (c) {
2001                 case 'a':
2002                         i = find_mask_map(optarg);
2003                         if (i < 0) {
2004                                 fprintf(stderr, "Invalid action mask %s\n",
2005                                         optarg);
2006                                 return 1;
2007                         }
2008                         act_mask_tmp |= i;
2009                         break;
2010
2011                 case 'A':
2012                         if ((sscanf(optarg, "%x", &i) != 1) ||
2013                                                         !valid_act_opt(i)) {
2014                                 fprintf(stderr,
2015                                         "Invalid set action mask %s/0x%x\n",
2016                                         optarg, i);
2017                                 return 1;
2018                         }
2019                         act_mask_tmp = i;
2020                         break;
2021
2022                 case 'd':
2023                         if (add_devpath(optarg) != 0)
2024                                 return 1;
2025                         break;
2026
2027                 case 'I': {
2028                         char dev_line[256];
2029                         FILE *ifp = my_fopen(optarg, "r");
2030
2031                         if (!ifp) {
2032                                 fprintf(stderr,
2033                                         "Invalid file for devices %s\n",
2034                                         optarg);
2035                                 return 1;
2036                         }
2037
2038                         while (fscanf(ifp, "%s\n", dev_line) == 1)
2039                                 if (add_devpath(dev_line) != 0)
2040                                         return 1;
2041                         break;
2042                 }
2043
2044                 case 'r':
2045                         debugfs_path = optarg;
2046                         break;
2047
2048                 case 'o':
2049                         output_name = optarg;
2050                         break;
2051                 case 'k':
2052                         kill_running_trace = 1;
2053                         break;
2054                 case 'w':
2055                         stop_watch = atoi(optarg);
2056                         if (stop_watch <= 0) {
2057                                 fprintf(stderr,
2058                                         "Invalid stopwatch value (%d secs)\n",
2059                                         stop_watch);
2060                                 return 1;
2061                         }
2062                         break;
2063                 case 'V':
2064                 case 'v':
2065                         printf("%s version %s\n", argv[0], blktrace_version);
2066                         exit(0);
2067                         /*NOTREACHED*/
2068                 case 'b':
2069                         buf_size = strtoul(optarg, NULL, 10);
2070                         if (buf_size <= 0 || buf_size > 16*1024) {
2071                                 fprintf(stderr, "Invalid buffer size (%lu)\n",
2072                                         buf_size);
2073                                 return 1;
2074                         }
2075                         buf_size <<= 10;
2076                         break;
2077                 case 'n':
2078                         buf_nr = strtoul(optarg, NULL, 10);
2079                         if (buf_nr <= 0) {
2080                                 fprintf(stderr,
2081                                         "Invalid buffer nr (%lu)\n", buf_nr);
2082                                 return 1;
2083                         }
2084                         break;
2085                 case 'D':
2086                         output_dir = optarg;
2087                         break;
2088                 case 'h':
2089                         net_mode = Net_client;
2090                         strcpy(hostname, optarg);
2091                         break;
2092                 case 'l':
2093                         net_mode = Net_server;
2094                         break;
2095                 case 'p':
2096                         net_port = atoi(optarg);
2097                         break;
2098                 case 's':
2099                         net_use_sendfile = 0;
2100                         break;
2101                 default:
2102                         show_usage(argv[0]);
2103                         exit(1);
2104                         /*NOTREACHED*/
2105                 }
2106         }
2107
2108         while (optind < argc)
2109                 if (add_devpath(argv[optind++]) != 0)
2110                         return 1;
2111
2112         if (net_mode != Net_server && ndevs == 0) {
2113                 show_usage(argv[0]);
2114                 return 1;
2115         }
2116
2117         if (statfs(debugfs_path, &st) < 0 || st.f_type != (long)DEBUGFS_TYPE) {
2118                 fprintf(stderr, "Invalid debug path %s: %d/%s\n",
2119                         debugfs_path, errno, strerror(errno));
2120                 return 1;
2121         }
2122
2123         if (act_mask_tmp != 0)
2124                 act_mask = act_mask_tmp;
2125
2126         if (net_mode == Net_client && net_setup_addr())
2127                 return 1;
2128
2129         /*
2130          * Set up for appropriate PFD handler based upon output name.
2131          */
2132         if (net_client_use_sendfile())
2133                 handle_pfds = handle_pfds_netclient;
2134         else if (net_client_use_send())
2135                 handle_pfds = handle_pfds_entries;
2136         else if (output_name && (strcmp(output_name, "-") == 0)) {
2137                 piped_output = 1;
2138                 handle_pfds = handle_pfds_entries;
2139                 pfp = stdout;
2140                 setvbuf(pfp, NULL, _IONBF, 0);
2141         } else
2142                 handle_pfds = handle_pfds_file;
2143         return 0;
2144 }
2145
2146 static void ch_add_connection(struct net_server_s *ns, struct cl_host *ch,
2147                               int fd)
2148 {
2149         struct cl_conn *nc;
2150
2151         nc = malloc(sizeof(*nc));
2152         memset(nc, 0, sizeof(*nc));
2153
2154         time(&nc->connect_time);
2155         nc->ch = ch;
2156         nc->fd = fd;
2157         nc->ncpus = -1;
2158
2159         list_add_tail(&nc->ch_head, &ch->conn_list);
2160         ch->connects++;
2161
2162         list_add_tail(&nc->ns_head, &ns->conn_list);
2163         ns->connects++;
2164         ns->pfds = realloc(ns->pfds, (ns->connects+1) * sizeof(struct pollfd));
2165 }
2166
2167 static void ch_rem_connection(struct net_server_s *ns, struct cl_host *ch,
2168                               struct cl_conn *nc)
2169 {
2170         net_close_connection(&nc->fd);
2171
2172         list_del(&nc->ch_head);
2173         ch->connects--;
2174
2175         list_del(&nc->ns_head);
2176         ns->connects--;
2177         ns->pfds = realloc(ns->pfds, (ns->connects+1) * sizeof(struct pollfd));
2178
2179         free(nc);
2180 }
2181
2182 static struct cl_host *net_find_client_host(struct net_server_s *ns,
2183                                             struct in_addr cl_in_addr)
2184 {
2185         struct list_head *p;
2186
2187         __list_for_each(p, &ns->ch_list) {
2188                 struct cl_host *ch = list_entry(p, struct cl_host, head);
2189
2190                 if (in_addr_eq(ch->cl_in_addr, cl_in_addr))
2191                         return ch;
2192         }
2193
2194         return NULL;
2195 }
2196
2197 static struct cl_host *net_add_client_host(struct net_server_s *ns,
2198                                            struct sockaddr_in *addr)
2199 {
2200         struct cl_host *ch;
2201
2202         ch = malloc(sizeof(*ch));
2203         memset(ch, 0, sizeof(*ch));
2204
2205         ch->ns = ns;
2206         ch->cl_in_addr = addr->sin_addr;
2207         list_add_tail(&ch->head, &ns->ch_list);
2208         ns->nchs++;
2209
2210         ch->hostname = strdup(inet_ntoa(addr->sin_addr));
2211         printf("server: connection from %s\n", ch->hostname);
2212
2213         INIT_LIST_HEAD(&ch->conn_list);
2214         INIT_LIST_HEAD(&ch->devpaths);
2215
2216         return ch;
2217 }
2218
2219 static void device_done(struct devpath *dpp, int ncpus)
2220 {
2221         int cpu;
2222         struct io_info *iop;
2223
2224         for (cpu = 0, iop = dpp->ios; cpu < ncpus; cpu++, iop++)
2225                 close_iop(iop);
2226
2227         list_del(&dpp->head);
2228         dpp_free(dpp);
2229 }
2230
2231 static void net_ch_remove(struct cl_host *ch, int ncpus)
2232 {
2233         struct list_head *p, *q;
2234         struct net_server_s *ns = ch->ns;
2235
2236         list_for_each_safe(p, q, &ch->devpaths) {
2237                 struct devpath *dpp = list_entry(p, struct devpath, head);
2238                 device_done(dpp, ncpus);
2239         }
2240
2241         list_for_each_safe(p, q, &ch->conn_list) {
2242                 struct cl_conn *nc = list_entry(p, struct cl_conn, ch_head);
2243
2244                 ch_rem_connection(ns, ch, nc);
2245         }
2246
2247         list_del(&ch->head);
2248         ns->nchs--;
2249
2250         if (ch->hostname)
2251                 free(ch->hostname);
2252         free(ch);
2253 }
2254
2255 static void net_add_connection(struct net_server_s *ns)
2256 {
2257         int fd;
2258         struct cl_host *ch;
2259         socklen_t socklen = sizeof(ns->addr);
2260
2261         fd = my_accept(ns->listen_fd, (struct sockaddr *)&ns->addr, &socklen);
2262         if (fd < 0) {
2263                 /*
2264                  * This is OK: we just won't accept this connection,
2265                  * nothing fatal.
2266                  */
2267                 perror("accept");
2268         } else {
2269                 ch = net_find_client_host(ns, ns->addr.sin_addr);
2270                 if (!ch)
2271                         ch = net_add_client_host(ns, &ns->addr);
2272
2273                 ch_add_connection(ns, ch, fd);
2274         }
2275 }
2276
2277 static struct devpath *nc_add_dpp(struct cl_conn *nc,
2278                                   struct blktrace_net_hdr *bnh,
2279                                   time_t connect_time)
2280 {
2281         int cpu;
2282         struct io_info *iop;
2283         struct devpath *dpp;
2284
2285         dpp = malloc(sizeof(*dpp));
2286         memset(dpp, 0, sizeof(*dpp));
2287
2288         dpp->buts_name = strdup(bnh->buts_name);
2289         dpp->path = strdup(bnh->buts_name);
2290         dpp->fd = -1;
2291         dpp->ch = nc->ch;
2292         dpp->cl_id = bnh->cl_id;
2293         dpp->cl_connect_time = connect_time;
2294         dpp->ncpus = nc->ncpus;
2295         dpp->stats = calloc(dpp->ncpus, sizeof(*dpp->stats));
2296         memset(dpp->stats, 0, dpp->ncpus * sizeof(*dpp->stats));
2297
2298         list_add_tail(&dpp->head, &nc->ch->devpaths);
2299         nc->ch->ndevs++;
2300
2301         dpp->ios = calloc(nc->ncpus, sizeof(*iop));
2302         memset(dpp->ios, 0, ndevs * sizeof(*iop));
2303
2304         for (cpu = 0, iop = dpp->ios; cpu < nc->ncpus; cpu++, iop++) {
2305                 iop->dpp = dpp;
2306                 iop->nc = nc;
2307                 init_mmap_info(&iop->mmap_info);
2308
2309                 if (iop_open(iop, cpu))
2310                         goto err;
2311         }
2312
2313         return dpp;
2314
2315 err:
2316         /*
2317          * Need to unravel what's been done...
2318          */
2319         while (cpu >= 0)
2320                 close_iop(&dpp->ios[cpu--]);
2321         dpp_free(dpp);
2322
2323         return NULL;
2324 }
2325
2326 static struct devpath *nc_find_dpp(struct cl_conn *nc,
2327                                    struct blktrace_net_hdr *bnh)
2328 {
2329         struct list_head *p;
2330         time_t connect_time = nc->connect_time;
2331
2332         __list_for_each(p, &nc->ch->devpaths) {
2333                 struct devpath *dpp = list_entry(p, struct devpath, head);
2334
2335                 if (!strcmp(dpp->buts_name, bnh->buts_name))
2336                         return dpp;
2337
2338                 if (dpp->cl_id == bnh->cl_id)
2339                         connect_time = dpp->cl_connect_time;
2340         }
2341
2342         return nc_add_dpp(nc, bnh, connect_time);
2343 }
2344
2345 static void net_client_read_data(struct cl_conn *nc, struct devpath *dpp,
2346                                  struct blktrace_net_hdr *bnh)
2347 {
2348         int ret;
2349         struct io_info *iop = &dpp->ios[bnh->cpu];
2350         struct mmap_info *mip = &iop->mmap_info;
2351
2352         if (setup_mmap(iop->ofd, bnh->len, &iop->mmap_info)) {
2353                 fprintf(stderr, "ncd(%s:%d): mmap failed\n",
2354                         nc->ch->hostname, nc->fd);
2355                 exit(1);
2356         }
2357
2358         ret = net_recv_data(nc->fd, mip->fs_buf + mip->fs_off, bnh->len);
2359         if (ret > 0) {
2360                 pdc_dr_update(dpp, bnh->cpu, ret);
2361                 mip->fs_size += ret;
2362                 mip->fs_off += ret;
2363         } else if (ret < 0)
2364                 exit(1);
2365 }
2366
2367 /*
2368  * Returns 1 if we closed a host - invalidates other polling information
2369  * that may be present.
2370  */
2371 static int net_client_data(struct cl_conn *nc)
2372 {
2373         int ret;
2374         struct devpath *dpp;
2375         struct blktrace_net_hdr bnh;
2376
2377         ret = net_get_header(nc, &bnh);
2378         if (ret == 0)
2379                 return 0;
2380
2381         if (ret < 0) {
2382                 fprintf(stderr, "ncd(%d): header read failed\n", nc->fd);
2383                 exit(1);
2384         }
2385
2386         if (data_is_native == -1 && check_data_endianness(bnh.magic)) {
2387                 fprintf(stderr, "ncd(%d): received data is bad\n", nc->fd);
2388                 exit(1);
2389         }
2390
2391         if (!data_is_native) {
2392                 bnh.magic = be32_to_cpu(bnh.magic);
2393                 bnh.cpu = be32_to_cpu(bnh.cpu);
2394                 bnh.max_cpus = be32_to_cpu(bnh.max_cpus);
2395                 bnh.len = be32_to_cpu(bnh.len);
2396                 bnh.cl_id = be32_to_cpu(bnh.cl_id);
2397                 bnh.buf_size = be32_to_cpu(bnh.buf_size);
2398                 bnh.buf_nr = be32_to_cpu(bnh.buf_nr);
2399                 bnh.page_size = be32_to_cpu(bnh.page_size);
2400         }
2401
2402         if ((bnh.magic & 0xffffff00) != BLK_IO_TRACE_MAGIC) {
2403                 fprintf(stderr, "ncd(%s:%d): bad data magic\n",
2404                         nc->ch->hostname, nc->fd);
2405                 exit(1);
2406         }
2407
2408         if (nc->ncpus == -1)
2409                 nc->ncpus = bnh.max_cpus;
2410
2411         /*
2412          * len == 0 means the other end is sending us a new connection/dpp
2413          * len == 1 means that the other end signalled end-of-run
2414          */
2415         dpp = nc_find_dpp(nc, &bnh);
2416         if (bnh.len == 0) {
2417                 /*
2418                  * Just adding in the dpp above is enough
2419                  */
2420                 ack_open_close(nc->fd, dpp->buts_name);
2421                 nc->ch->cl_opens++;
2422         } else if (bnh.len == 1) {
2423                 /*
2424                  * overload cpu count with dropped events
2425                  */
2426                 dpp->drops = bnh.cpu;
2427
2428                 ack_open_close(nc->fd, dpp->buts_name);
2429                 if (--nc->ch->cl_opens == 0) {
2430                         show_stats(&nc->ch->devpaths);
2431                         net_ch_remove(nc->ch, nc->ncpus);
2432                         return 1;
2433                 }
2434         } else
2435                 net_client_read_data(nc, dpp, &bnh);
2436
2437         return 0;
2438 }
2439
2440 static void handle_client_data(struct net_server_s *ns, int events)
2441 {
2442         struct cl_conn *nc;
2443         struct pollfd *pfd;
2444         struct list_head *p, *q;
2445
2446         pfd = &ns->pfds[1];
2447         list_for_each_safe(p, q, &ns->conn_list) {
2448                 if (pfd->revents & POLLIN) {
2449                         nc = list_entry(p, struct cl_conn, ns_head);
2450
2451                         if (net_client_data(nc) || --events == 0)
2452                                 break;
2453                 }
2454                 pfd++;
2455         }
2456 }
2457
2458 static void net_setup_pfds(struct net_server_s *ns)
2459 {
2460         struct pollfd *pfd;
2461         struct list_head *p;
2462
2463         ns->pfds[0].fd = ns->listen_fd;
2464         ns->pfds[0].events = POLLIN;
2465
2466         pfd = &ns->pfds[1];
2467         __list_for_each(p, &ns->conn_list) {
2468                 struct cl_conn *nc = list_entry(p, struct cl_conn, ns_head);
2469
2470                 pfd->fd = nc->fd;
2471                 pfd->events = POLLIN;
2472                 pfd++;
2473         }
2474 }
2475
2476 static int net_server_handle_connections(struct net_server_s *ns)
2477 {
2478         int events;
2479
2480         printf("server: waiting for connections...\n");
2481
2482         while (!done) {
2483                 net_setup_pfds(ns);
2484                 events = poll(ns->pfds, ns->connects + 1, -1);
2485                 if (events < 0) {
2486                         if (errno != EINTR) {
2487                                 perror("FATAL: poll error");
2488                                 return 1;
2489                         }
2490                 } else if (events > 0) {
2491                         if (ns->pfds[0].revents & POLLIN) {
2492                                 net_add_connection(ns);
2493                                 events--;
2494                         }
2495
2496                         if (events)
2497                                 handle_client_data(ns, events);
2498                 }
2499         }
2500
2501         return 0;
2502 }
2503
2504 static int net_server(void)
2505 {
2506         int fd, opt;
2507         int ret = 1;
2508         struct net_server_s net_server;
2509         struct net_server_s *ns = &net_server;
2510
2511         memset(ns, 0, sizeof(*ns));
2512         INIT_LIST_HEAD(&ns->ch_list);
2513         INIT_LIST_HEAD(&ns->conn_list);
2514         ns->pfds = malloc(sizeof(struct pollfd));
2515
2516         fd = my_socket(AF_INET, SOCK_STREAM, 0);
2517         if (fd < 0) {
2518                 perror("server: socket");
2519                 goto out;
2520         }
2521
2522         opt = 1;
2523         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
2524                 perror("setsockopt");
2525                 goto out;
2526         }
2527
2528         memset(&ns->addr, 0, sizeof(ns->addr));
2529         ns->addr.sin_family = AF_INET;
2530         ns->addr.sin_addr.s_addr = htonl(INADDR_ANY);
2531         ns->addr.sin_port = htons(net_port);
2532
2533         if (bind(fd, (struct sockaddr *) &ns->addr, sizeof(ns->addr)) < 0) {
2534                 perror("bind");
2535                 goto out;
2536         }
2537
2538         if (listen(fd, 1) < 0) {
2539                 perror("listen");
2540                 goto out;
2541         }
2542
2543         /*
2544          * The actual server looping is done here:
2545          */
2546         ns->listen_fd = fd;
2547         ret = net_server_handle_connections(ns);
2548
2549         /*
2550          * Clean up and return...
2551          */
2552 out:
2553         free(ns->pfds);
2554         return ret;
2555 }
2556
2557 static int run_tracers(void)
2558 {
2559         atexit(exit_tracing);
2560         if (net_mode == Net_client)
2561                 printf("blktrace: connecting to %s\n", hostname);
2562
2563         setup_buts();
2564
2565         if (use_tracer_devpaths()) {
2566                 if (setup_tracer_devpaths())
2567                         return 1;
2568
2569                 if (piped_output)
2570                         handle_list = handle_list_file;
2571                 else
2572                         handle_list = handle_list_net;
2573         }
2574
2575         start_tracers();
2576         if (nthreads_running == ncpus) {
2577                 unblock_tracers();
2578                 start_buts();
2579                 if (net_mode == Net_client)
2580                         printf("blktrace: connected!\n");
2581                 if (stop_watch)
2582                         alarm(stop_watch);
2583         } else
2584                 stop_tracers();
2585
2586         wait_tracers();
2587         if (nthreads_running == ncpus)
2588                 show_stats(&devpaths);
2589         if (net_client_use_send())
2590                 close_client_connections();
2591         del_tracers();
2592
2593         return 0;
2594 }
2595
2596 int main(int argc, char *argv[])
2597 {
2598         int ret = 0;
2599
2600         setlocale(LC_NUMERIC, "en_US");
2601         pagesize = getpagesize();
2602         ncpus = sysconf(_SC_NPROCESSORS_ONLN);
2603         if (ncpus < 0) {
2604                 fprintf(stderr, "sysconf(_SC_NPROCESSORS_ONLN) failed %d/%s\n",
2605                         errno, strerror(errno));
2606                 ret = 1;
2607                 goto out;
2608         } else if (handle_args(argc, argv)) {
2609                 ret = 1;
2610                 goto out;
2611         }
2612
2613         signal(SIGINT, handle_sigint);
2614         signal(SIGHUP, handle_sigint);
2615         signal(SIGTERM, handle_sigint);
2616         signal(SIGALRM, handle_sigint);
2617         signal(SIGPIPE, SIG_IGN);
2618
2619         if (kill_running_trace) {
2620                 struct devpath *dpp;
2621                 struct list_head *p;
2622
2623                 __list_for_each(p, &devpaths) {
2624                         dpp = list_entry(p, struct devpath, head);
2625                         if (__stop_trace(dpp->fd)) {
2626                                 fprintf(stderr,
2627                                         "BLKTRACETEARDOWN %s failed: %d/%s\n",
2628                                         dpp->path, errno, strerror(errno));
2629                         }
2630                 }
2631         } else if (net_mode == Net_server) {
2632                 if (output_name) {
2633                         fprintf(stderr, "-o ignored in server mode\n");
2634                         output_name = NULL;
2635                 }
2636                 ret = net_server();
2637         } else
2638                 ret = run_tracers();
2639
2640 out:
2641         if (pfp)
2642                 fclose(pfp);
2643         rel_devpaths();
2644         return ret;
2645 }