blkparse: split off the timestamp correction code in to a separate function
[blktrace.git] / blktrace.c
1 /*
2  * block queue tracing application
3  *
4  * Copyright (C) 2005 Jens Axboe <axboe@suse.de>
5  * Copyright (C) 2006 Jens Axboe <axboe@kernel.dk>
6  *
7  * Rewrite to have a single thread per CPU (managing all devices on that CPU)
8  *      Alan D. Brunelle <alan.brunelle@hp.com> - January 2009
9  *
10  *  This program is free software; you can redistribute it and/or modify
11  *  it under the terms of the GNU General Public License as published by
12  *  the Free Software Foundation; either version 2 of the License, or
13  *  (at your option) any later version.
14  *
15  *  This program is distributed in the hope that it will be useful,
16  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
17  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
18  *  GNU General Public License for more details.
19  *
20  *  You should have received a copy of the GNU General Public License
21  *  along with this program; if not, write to the Free Software
22  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
23  *
24  */
25
26 #include <errno.h>
27 #include <stdarg.h>
28 #include <stdio.h>
29 #include <stdlib.h>
30 #include <string.h>
31 #include <fcntl.h>
32 #include <getopt.h>
33 #include <sched.h>
34 #include <unistd.h>
35 #include <poll.h>
36 #include <signal.h>
37 #include <pthread.h>
38 #include <locale.h>
39 #include <sys/ioctl.h>
40 #include <sys/types.h>
41 #include <sys/stat.h>
42 #include <sys/vfs.h>
43 #include <sys/mman.h>
44 #include <sys/param.h>
45 #include <sys/time.h>
46 #include <sys/resource.h>
47 #include <sys/socket.h>
48 #include <netinet/in.h>
49 #include <arpa/inet.h>
50 #include <netdb.h>
51 #include <sys/sendfile.h>
52
53 #include "btt/list.h"
54 #include "blktrace.h"
55
56 /*
57  * You may want to increase this even more, if you are logging at a high
58  * rate and see skipped/missed events
59  */
60 #define BUF_SIZE                (512 * 1024)
61 #define BUF_NR                  (4)
62
63 #define FILE_VBUF_SIZE          (128 * 1024)
64
65 #define DEBUGFS_TYPE            (0x64626720)
66 #define TRACE_NET_PORT          (8462)
67
68 enum {
69         Net_none = 0,
70         Net_server,
71         Net_client,
72 };
73
74 enum thread_status {
75         Th_running,
76         Th_leaving,
77         Th_error
78 };
79
80 /*
81  * Generic stats collected: nevents can be _roughly_ estimated by data_read
82  * (discounting pdu...)
83  *
84  * These fields are updated w/ pdc_dr_update & pdc_nev_update below.
85  */
86 struct pdc_stats {
87         unsigned long long data_read;
88         unsigned long long nevents;
89 };
90
91 struct devpath {
92         struct list_head head;
93         char *path;                     /* path to device special file */
94         char *buts_name;                /* name returned from bt kernel code */
95         struct pdc_stats *stats;
96         int fd, ncpus;
97         unsigned long long drops;
98
99         /*
100          * For piped output only:
101          *
102          * Each tracer will have a tracer_devpath_head that it will add new
103          * data onto. It's list is protected above (tracer_devpath_head.mutex)
104          * and it will signal the processing thread using the dp_cond,
105          * dp_mutex & dp_entries variables above.
106          */
107         struct tracer_devpath_head *heads;
108
109         /*
110          * For network server mode only:
111          */
112         struct cl_host *ch;
113         u32 cl_id;
114         time_t cl_connect_time;
115         int setup_done; /* ioctl BLKTRACESETUP done */
116         struct io_info *ios;
117 };
118
119 /*
120  * For piped output to stdout we will have each tracer thread (one per dev)
121  * tack buffers read from the relay queues on a per-device list.
122  *
123  * The main thread will then collect trace buffers from each of lists in turn.
124  *
125  * We will use a mutex to guard each of the trace_buf list. The tracers
126  * can then signal the main thread using <dp_cond,dp_mutex> and
127  * dp_entries. (When dp_entries is 0, and a tracer adds an entry it will
128  * signal. When dp_entries is 0, the main thread will wait for that condition
129  * to be signalled.)
130  *
131  * adb: It may be better just to have a large buffer per tracer per dev,
132  * and then use it as a ring-buffer. This would certainly cut down a lot
133  * of malloc/free thrashing, at the cost of more memory movements (potentially).
134  */
135 struct trace_buf {
136         struct list_head head;
137         struct devpath *dpp;
138         void *buf;
139         int cpu, len;
140 };
141
142 struct tracer_devpath_head {
143         pthread_mutex_t mutex;
144         struct list_head head;
145         struct trace_buf *prev;
146 };
147
148 /*
149  * Used to handle the mmap() interfaces for output file (containing traces)
150  */
151 struct mmap_info {
152         void *fs_buf;
153         unsigned long long fs_size, fs_max_size, fs_off, fs_buf_len;
154         unsigned long buf_size, buf_nr;
155         int pagesize;
156 };
157
158 /*
159  * Each thread doing work on a (client) side of blktrace will have one
160  * of these. The ios array contains input/output information, pfds holds
161  * poll() data. The volatile's provide flags to/from the main executing
162  * thread.
163  */
164 struct tracer {
165         struct list_head head;
166         struct io_info *ios;
167         struct pollfd *pfds;
168         pthread_t thread;
169         int cpu, nios;
170         volatile int status, is_done;
171 };
172
173 /*
174  * networking stuff follows. we include a magic number so we know whether
175  * to endianness convert or not.
176  *
177  * The len field is overloaded:
178  *      0 - Indicates an "open" - allowing the server to set up for a dev/cpu
179  *      1 - Indicates a "close" - Shut down connection orderly
180  *
181  * The cpu field is overloaded on close: it will contain the number of drops.
182  */
183 struct blktrace_net_hdr {
184         u32 magic;              /* same as trace magic */
185         char buts_name[32];     /* trace name */
186         u32 cpu;                /* for which cpu */
187         u32 max_cpus;
188         u32 len;                /* length of following trace data */
189         u32 cl_id;              /* id for set of client per-cpu connections */
190         u32 buf_size;           /* client buf_size for this trace  */
191         u32 buf_nr;             /* client buf_nr for this trace  */
192         u32 page_size;          /* client page_size for this trace  */
193 };
194
195 /*
196  * Each host encountered has one of these. The head is used to link this
197  * on to the network server's ch_list. Connections associated with this
198  * host are linked on conn_list, and any devices traced on that host
199  * are connected on the devpaths list.
200  */
201 struct cl_host {
202         struct list_head head;
203         struct list_head conn_list;
204         struct list_head devpaths;
205         struct net_server_s *ns;
206         char *hostname;
207         struct in_addr cl_in_addr;
208         int connects, ndevs, cl_opens;
209 };
210
211 /*
212  * Each connection (client to server socket ('fd')) has one of these. A
213  * back reference to the host ('ch'), and lists headers (for the host
214  * list, and the network server conn_list) are also included.
215  */
216 struct cl_conn {
217         struct list_head ch_head, ns_head;
218         struct cl_host *ch;
219         int fd, ncpus;
220         time_t connect_time;
221 };
222
223 /*
224  * The network server requires some poll structures to be maintained -
225  * one per conection currently on conn_list. The nchs/ch_list values
226  * are for each host connected to this server. The addr field is used
227  * for scratch as new connections are established.
228  */
229 struct net_server_s {
230         struct list_head conn_list;
231         struct list_head ch_list;
232         struct pollfd *pfds;
233         int listen_fd, connects, nchs;
234         struct sockaddr_in addr;
235 };
236
237 /*
238  * This structure is (generically) used to providide information
239  * for a read-to-write set of values.
240  *
241  * ifn & ifd represent input information
242  *
243  * ofn, ofd, ofp, obuf & mmap_info are used for output file (optionally).
244  */
245 struct io_info {
246         struct devpath *dpp;
247         FILE *ofp;
248         char *obuf;
249         struct cl_conn *nc;     /* Server network connection */
250
251         /*
252          * mmap controlled output files
253          */
254         struct mmap_info mmap_info;
255
256         /*
257          * Client network fields
258          */
259         unsigned int ready;
260         unsigned long long data_queued;
261
262         /*
263          * Input/output file descriptors & names
264          */
265         int ifd, ofd;
266         char ifn[MAXPATHLEN + 64];
267         char ofn[MAXPATHLEN + 64];
268 };
269
270 static char blktrace_version[] = "2.0.0";
271
272 /*
273  * Linkage to blktrace helper routines (trace conversions)
274  */
275 int data_is_native = -1;
276
277 static int ndevs;
278 static int max_cpus;
279 static int ncpus;
280 static cpu_set_t *online_cpus;
281 static int pagesize;
282 static int act_mask = ~0U;
283 static int kill_running_trace;
284 static int stop_watch;
285 static int piped_output;
286
287 static char *debugfs_path = "/sys/kernel/debug";
288 static char *output_name;
289 static char *output_dir;
290
291 static unsigned long buf_size = BUF_SIZE;
292 static unsigned long buf_nr = BUF_NR;
293
294 static FILE *pfp;
295
296 static LIST_HEAD(devpaths);
297 static LIST_HEAD(tracers);
298
299 static volatile int done;
300
301 /*
302  * tracer threads add entries, the main thread takes them off and processes
303  * them. These protect the dp_entries variable.
304  */
305 static pthread_cond_t dp_cond = PTHREAD_COND_INITIALIZER;
306 static pthread_mutex_t dp_mutex = PTHREAD_MUTEX_INITIALIZER;
307 static volatile int dp_entries;
308
309 /*
310  * These synchronize master / thread interactions.
311  */
312 static pthread_cond_t mt_cond = PTHREAD_COND_INITIALIZER;
313 static pthread_mutex_t mt_mutex = PTHREAD_MUTEX_INITIALIZER;
314 static volatile int nthreads_running;
315 static volatile int nthreads_leaving;
316 static volatile int nthreads_error;
317 static volatile int tracers_run;
318
319 /*
320  * network cmd line params
321  */
322 static struct sockaddr_in hostname_addr;
323 static char hostname[MAXHOSTNAMELEN];
324 static int net_port = TRACE_NET_PORT;
325 static int net_use_sendfile = 1;
326 static int net_mode;
327 static int *cl_fds;
328
329 static int (*handle_pfds)(struct tracer *, int, int);
330 static int (*handle_list)(struct tracer_devpath_head *, struct list_head *);
331
332 #define S_OPTS  "d:a:A:r:o:kw:vVb:n:D:lh:p:sI:"
333 static struct option l_opts[] = {
334         {
335                 .name = "dev",
336                 .has_arg = required_argument,
337                 .flag = NULL,
338                 .val = 'd'
339         },
340         {
341                 .name = "input-devs",
342                 .has_arg = required_argument,
343                 .flag = NULL,
344                 .val = 'I'
345         },
346         {
347                 .name = "act-mask",
348                 .has_arg = required_argument,
349                 .flag = NULL,
350                 .val = 'a'
351         },
352         {
353                 .name = "set-mask",
354                 .has_arg = required_argument,
355                 .flag = NULL,
356                 .val = 'A'
357         },
358         {
359                 .name = "relay",
360                 .has_arg = required_argument,
361                 .flag = NULL,
362                 .val = 'r'
363         },
364         {
365                 .name = "output",
366                 .has_arg = required_argument,
367                 .flag = NULL,
368                 .val = 'o'
369         },
370         {
371                 .name = "kill",
372                 .has_arg = no_argument,
373                 .flag = NULL,
374                 .val = 'k'
375         },
376         {
377                 .name = "stopwatch",
378                 .has_arg = required_argument,
379                 .flag = NULL,
380                 .val = 'w'
381         },
382         {
383                 .name = "version",
384                 .has_arg = no_argument,
385                 .flag = NULL,
386                 .val = 'v'
387         },
388         {
389                 .name = "version",
390                 .has_arg = no_argument,
391                 .flag = NULL,
392                 .val = 'V'
393         },
394         {
395                 .name = "buffer-size",
396                 .has_arg = required_argument,
397                 .flag = NULL,
398                 .val = 'b'
399         },
400         {
401                 .name = "num-sub-buffers",
402                 .has_arg = required_argument,
403                 .flag = NULL,
404                 .val = 'n'
405         },
406         {
407                 .name = "output-dir",
408                 .has_arg = required_argument,
409                 .flag = NULL,
410                 .val = 'D'
411         },
412         {
413                 .name = "listen",
414                 .has_arg = no_argument,
415                 .flag = NULL,
416                 .val = 'l'
417         },
418         {
419                 .name = "host",
420                 .has_arg = required_argument,
421                 .flag = NULL,
422                 .val = 'h'
423         },
424         {
425                 .name = "port",
426                 .has_arg = required_argument,
427                 .flag = NULL,
428                 .val = 'p'
429         },
430         {
431                 .name = "no-sendfile",
432                 .has_arg = no_argument,
433                 .flag = NULL,
434                 .val = 's'
435         },
436         {
437                 .name = NULL,
438         }
439 };
440
441 static char usage_str[] = "\n\n" \
442         "-d <dev>             | --dev=<dev>\n" \
443         "[ -r <debugfs path>  | --relay=<debugfs path> ]\n" \
444         "[ -o <file>          | --output=<file>]\n" \
445         "[ -D <dir>           | --output-dir=<dir>\n" \
446         "[ -w <time>          | --stopwatch=<time>]\n" \
447         "[ -a <action field>  | --act-mask=<action field>]\n" \
448         "[ -A <action mask>   | --set-mask=<action mask>]\n" \
449         "[ -b <size>          | --buffer-size]\n" \
450         "[ -n <number>        | --num-sub-buffers=<number>]\n" \
451         "[ -l                 | --listen]\n" \
452         "[ -h <hostname>      | --host=<hostname>]\n" \
453         "[ -p <port number>   | --port=<port number>]\n" \
454         "[ -s                 | --no-sendfile]\n" \
455         "[ -I <devs file>     | --input-devs=<devs file>]\n" \
456         "[ -v <version>       | --version]\n" \
457         "[ -V <version>       | --version]\n" \
458
459         "\t-d Use specified device. May also be given last after options\n" \
460         "\t-r Path to mounted debugfs, defaults to /sys/kernel/debug\n" \
461         "\t-o File(s) to send output to\n" \
462         "\t-D Directory to prepend to output file names\n" \
463         "\t-w Stop after defined time, in seconds\n" \
464         "\t-a Only trace specified actions. See documentation\n" \
465         "\t-A Give trace mask as a single value. See documentation\n" \
466         "\t-b Sub buffer size in KiB (default 512)\n" \
467         "\t-n Number of sub buffers (default 4)\n" \
468         "\t-l Run in network listen mode (blktrace server)\n" \
469         "\t-h Run in network client mode, connecting to the given host\n" \
470         "\t-p Network port to use (default 8462)\n" \
471         "\t-s Make the network client NOT use sendfile() to transfer data\n" \
472         "\t-I Add devices found in <devs file>\n" \
473         "\t-v Print program version info\n" \
474         "\t-V Print program version info\n\n";
475
476 static void clear_events(struct pollfd *pfd)
477 {
478         pfd->events = 0;
479         pfd->revents = 0;
480 }
481
482 static inline int net_client_use_sendfile(void)
483 {
484         return net_mode == Net_client && net_use_sendfile;
485 }
486
487 static inline int net_client_use_send(void)
488 {
489         return net_mode == Net_client && !net_use_sendfile;
490 }
491
492 static inline int use_tracer_devpaths(void)
493 {
494         return piped_output || net_client_use_send();
495 }
496
497 static inline int in_addr_eq(struct in_addr a, struct in_addr b)
498 {
499         return a.s_addr == b.s_addr;
500 }
501
502 static inline void pdc_dr_update(struct devpath *dpp, int cpu, int data_read)
503 {
504         dpp->stats[cpu].data_read += data_read;
505 }
506
507 static inline void pdc_nev_update(struct devpath *dpp, int cpu, int nevents)
508 {
509         dpp->stats[cpu].nevents += nevents;
510 }
511
512 static void show_usage(char *prog)
513 {
514         fprintf(stderr, "Usage: %s %s", prog, usage_str);
515 }
516
517 /*
518  * Create a timespec 'msec' milliseconds into the future
519  */
520 static inline void make_timespec(struct timespec *tsp, long delta_msec)
521 {
522         struct timeval now;
523
524         gettimeofday(&now, NULL);
525         tsp->tv_sec = now.tv_sec;
526         tsp->tv_nsec = 1000L * now.tv_usec;
527
528         tsp->tv_nsec += (delta_msec * 1000000L);
529         if (tsp->tv_nsec > 1000000000L) {
530                 long secs = tsp->tv_nsec / 1000000000L;
531
532                 tsp->tv_sec += secs;
533                 tsp->tv_nsec -= (secs * 1000000000L);
534         }
535 }
536
537 /*
538  * Add a timer to ensure wait ends
539  */
540 static void t_pthread_cond_wait(pthread_cond_t *cond, pthread_mutex_t *mutex)
541 {
542         struct timespec ts;
543
544         make_timespec(&ts, 50);
545         pthread_cond_timedwait(cond, mutex, &ts);
546 }
547
548 static void unblock_tracers(void)
549 {
550         pthread_mutex_lock(&mt_mutex);
551         tracers_run = 1;
552         pthread_cond_broadcast(&mt_cond);
553         pthread_mutex_unlock(&mt_mutex);
554 }
555
556 static void tracer_wait_unblock(struct tracer *tp)
557 {
558         pthread_mutex_lock(&mt_mutex);
559         while (!tp->is_done && !tracers_run)
560                 pthread_cond_wait(&mt_cond, &mt_mutex);
561         pthread_mutex_unlock(&mt_mutex);
562 }
563
564 static void tracer_signal_ready(struct tracer *tp,
565                                 enum thread_status th_status,
566                                 int status)
567 {
568         pthread_mutex_lock(&mt_mutex);
569         tp->status = status;
570
571         if (th_status == Th_running)
572                 nthreads_running++;
573         else if (th_status == Th_error)
574                 nthreads_error++;
575         else
576                 nthreads_leaving++;
577
578         pthread_cond_signal(&mt_cond);
579         pthread_mutex_unlock(&mt_mutex);
580 }
581
582 static void wait_tracers_ready(int ncpus_started)
583 {
584         pthread_mutex_lock(&mt_mutex);
585         while ((nthreads_running + nthreads_error) < ncpus_started)
586                 t_pthread_cond_wait(&mt_cond, &mt_mutex);
587         pthread_mutex_unlock(&mt_mutex);
588 }
589
590 static void wait_tracers_leaving(void)
591 {
592         pthread_mutex_lock(&mt_mutex);
593         while (nthreads_leaving < nthreads_running)
594                 t_pthread_cond_wait(&mt_cond, &mt_mutex);
595         pthread_mutex_unlock(&mt_mutex);
596 }
597
598 static void init_mmap_info(struct mmap_info *mip)
599 {
600         mip->buf_size = buf_size;
601         mip->buf_nr = buf_nr;
602         mip->pagesize = pagesize;
603 }
604
605 static void net_close_connection(int *fd)
606 {
607         shutdown(*fd, SHUT_RDWR);
608         close(*fd);
609         *fd = -1;
610 }
611
612 static void dpp_free(struct devpath *dpp)
613 {
614         if (dpp->stats)
615                 free(dpp->stats);
616         if (dpp->ios)
617                 free(dpp->ios);
618         if (dpp->path)
619                 free(dpp->path);
620         if (dpp->buts_name)
621                 free(dpp->buts_name);
622         free(dpp);
623 }
624
625 static int lock_on_cpu(int cpu)
626 {
627         cpu_set_t * cpu_mask;
628         size_t size;
629
630         cpu_mask = CPU_ALLOC(max_cpus);
631         size = CPU_ALLOC_SIZE(max_cpus);
632
633         CPU_ZERO_S(size, cpu_mask);
634         CPU_SET_S(cpu, size, cpu_mask);
635         if (sched_setaffinity(0, size, cpu_mask) < 0) {
636                 CPU_FREE(cpu_mask);             
637                 return errno;
638         }
639
640         CPU_FREE(cpu_mask);             
641         return 0;
642 }
643
644 static int increase_limit(int resource, rlim_t increase)
645 {
646         struct rlimit rlim;
647         int save_errno = errno;
648
649         if (!getrlimit(resource, &rlim)) {
650                 rlim.rlim_cur += increase;
651                 if (rlim.rlim_cur >= rlim.rlim_max)
652                         rlim.rlim_max = rlim.rlim_cur + increase;
653
654                 if (!setrlimit(resource, &rlim))
655                         return 1;
656         }
657
658         errno = save_errno;
659         return 0;
660 }
661
662 static int handle_open_failure(void)
663 {
664         if (errno == ENFILE || errno == EMFILE)
665                 return increase_limit(RLIMIT_NOFILE, 16);
666         return 0;
667 }
668
669 static int handle_mem_failure(size_t length)
670 {
671         if (errno == ENFILE)
672                 return handle_open_failure();
673         else if (errno == ENOMEM)
674                 return increase_limit(RLIMIT_MEMLOCK, 2 * length);
675         return 0;
676 }
677
678 static FILE *my_fopen(const char *path, const char *mode)
679 {
680         FILE *fp;
681
682         do {
683                 fp = fopen(path, mode);
684         } while (fp == NULL && handle_open_failure());
685
686         return fp;
687 }
688
689 static int my_open(const char *path, int flags)
690 {
691         int fd;
692
693         do {
694                 fd = open(path, flags);
695         } while (fd < 0 && handle_open_failure());
696
697         return fd;
698 }
699
700 static int my_socket(int domain, int type, int protocol)
701 {
702         int fd;
703
704         do {
705                 fd = socket(domain, type, protocol);
706         } while (fd < 0 && handle_open_failure());
707
708         return fd;
709 }
710
711 static int my_accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen)
712 {
713         int fd;
714
715         do {
716                 fd = accept(sockfd, addr, addrlen);
717         } while (fd < 0 && handle_open_failure());
718
719         return fd;
720 }
721
722 static void *my_mmap(void *addr, size_t length, int prot, int flags, int fd,
723                      off_t offset)
724 {
725         void *new;
726
727         do {
728                 new = mmap(addr, length, prot, flags, fd, offset);
729         } while (new == MAP_FAILED && handle_mem_failure(length));
730
731         return new;
732 }
733
734 static int my_mlock(struct tracer *tp,
735                     const void *addr, size_t len)
736 {
737         int ret, retry = 0;
738
739         do {
740                 ret = mlock(addr, len);
741                 if ((retry >= 10) && tp && tp->is_done)
742                         break;
743                 retry++;
744         } while (ret < 0 && handle_mem_failure(len));
745
746         return ret;
747 }
748
749 static int setup_mmap(int fd, unsigned int maxlen,
750                       struct mmap_info *mip,
751                       struct tracer *tp)
752 {
753         if (mip->fs_off + maxlen > mip->fs_buf_len) {
754                 unsigned long nr = max(16, mip->buf_nr);
755
756                 if (mip->fs_buf) {
757                         munlock(mip->fs_buf, mip->fs_buf_len);
758                         munmap(mip->fs_buf, mip->fs_buf_len);
759                         mip->fs_buf = NULL;
760                 }
761
762                 mip->fs_off = mip->fs_size & (mip->pagesize - 1);
763                 mip->fs_buf_len = (nr * mip->buf_size) - mip->fs_off;
764                 mip->fs_max_size += mip->fs_buf_len;
765
766                 if (ftruncate(fd, mip->fs_max_size) < 0) {
767                         perror("setup_mmap: ftruncate");
768                         return 1;
769                 }
770
771                 mip->fs_buf = my_mmap(NULL, mip->fs_buf_len, PROT_WRITE,
772                                       MAP_SHARED, fd,
773                                       mip->fs_size - mip->fs_off);
774                 if (mip->fs_buf == MAP_FAILED) {
775                         perror("setup_mmap: mmap");
776                         return 1;
777                 }
778                 if (my_mlock(tp, mip->fs_buf, mip->fs_buf_len) < 0) {
779                         perror("setup_mlock: mlock");
780                         return 1;
781                 }
782         }
783
784         return 0;
785 }
786
787 static int __stop_trace(int fd)
788 {
789         /*
790          * Should be stopped, don't complain if it isn't
791          */
792         ioctl(fd, BLKTRACESTOP);
793         return ioctl(fd, BLKTRACETEARDOWN);
794 }
795
796 static int write_data(char *buf, int len)
797 {
798         int ret;
799
800 rewrite:
801         ret = fwrite(buf, len, 1, pfp);
802         if (ferror(pfp) || ret != 1) {
803                 if (errno == EINTR) {
804                         clearerr(pfp);
805                         goto rewrite;
806                 }
807
808                 if (!piped_output || (errno != EPIPE && errno != EBADF)) {
809                         fprintf(stderr, "write(%d) failed: %d/%s\n",
810                                 len, errno, strerror(errno));
811                 }
812                 goto err;
813         }
814
815         fflush(pfp);
816         return 0;
817
818 err:
819         clearerr(pfp);
820         return 1;
821 }
822
823 /*
824  * Returns the number of bytes read (successfully)
825  */
826 static int __net_recv_data(int fd, void *buf, unsigned int len)
827 {
828         unsigned int bytes_left = len;
829
830         while (bytes_left && !done) {
831                 int ret = recv(fd, buf, bytes_left, MSG_WAITALL);
832
833                 if (ret == 0)
834                         break;
835                 else if (ret < 0) {
836                         if (errno == EAGAIN) {
837                                 usleep(50);
838                                 continue;
839                         }
840                         perror("server: net_recv_data: recv failed");
841                         break;
842                 } else {
843                         buf += ret;
844                         bytes_left -= ret;
845                 }
846         }
847
848         return len - bytes_left;
849 }
850
851 static int net_recv_data(int fd, void *buf, unsigned int len)
852 {
853         return __net_recv_data(fd, buf, len);
854 }
855
856 /*
857  * Returns number of bytes written
858  */
859 static int net_send_data(int fd, void *buf, unsigned int buf_len)
860 {
861         int ret;
862         unsigned int bytes_left = buf_len;
863
864         while (bytes_left) {
865                 ret = send(fd, buf, bytes_left, 0);
866                 if (ret < 0) {
867                         perror("send");
868                         break;
869                 }
870
871                 buf += ret;
872                 bytes_left -= ret;
873         }
874
875         return buf_len - bytes_left;
876 }
877
878 static int net_send_header(int fd, int cpu, char *buts_name, int len)
879 {
880         struct blktrace_net_hdr hdr;
881
882         memset(&hdr, 0, sizeof(hdr));
883
884         hdr.magic = BLK_IO_TRACE_MAGIC;
885         memset(hdr.buts_name, 0, sizeof(hdr.buts_name));
886         strncpy(hdr.buts_name, buts_name, sizeof(hdr.buts_name));
887         hdr.buts_name[sizeof(hdr.buts_name) - 1] = '\0';
888         hdr.cpu = cpu;
889         hdr.max_cpus = max_cpus;
890         hdr.len = len;
891         hdr.cl_id = getpid();
892         hdr.buf_size = buf_size;
893         hdr.buf_nr = buf_nr;
894         hdr.page_size = pagesize;
895
896         return net_send_data(fd, &hdr, sizeof(hdr)) != sizeof(hdr);
897 }
898
899 static void net_send_open_close(int fd, int cpu, char *buts_name, int len)
900 {
901         struct blktrace_net_hdr ret_hdr;
902
903         net_send_header(fd, cpu, buts_name, len);
904         net_recv_data(fd, &ret_hdr, sizeof(ret_hdr));
905 }
906
907 static void net_send_open(int fd, int cpu, char *buts_name)
908 {
909         net_send_open_close(fd, cpu, buts_name, 0);
910 }
911
912 static void net_send_close(int fd, char *buts_name, int drops)
913 {
914         /*
915          * Overload CPU w/ number of drops
916          *
917          * XXX: Need to clear/set done around call - done=1 (which
918          * is true here) stops reads from happening... :-(
919          */
920         done = 0;
921         net_send_open_close(fd, drops, buts_name, 1);
922         done = 1;
923 }
924
925 static void ack_open_close(int fd, char *buts_name)
926 {
927         net_send_header(fd, 0, buts_name, 2);
928 }
929
930 static void net_send_drops(int fd)
931 {
932         struct list_head *p;
933
934         __list_for_each(p, &devpaths) {
935                 struct devpath *dpp = list_entry(p, struct devpath, head);
936
937                 net_send_close(fd, dpp->buts_name, dpp->drops);
938         }
939 }
940
941 /*
942  * Returns:
943  *       0: "EOF"
944  *       1: OK
945  *      -1: Error
946  */
947 static int net_get_header(struct cl_conn *nc, struct blktrace_net_hdr *bnh)
948 {
949         int bytes_read;
950         int fl = fcntl(nc->fd, F_GETFL);
951
952         fcntl(nc->fd, F_SETFL, fl | O_NONBLOCK);
953         bytes_read = __net_recv_data(nc->fd, bnh, sizeof(*bnh));
954         fcntl(nc->fd, F_SETFL, fl & ~O_NONBLOCK);
955
956         if (bytes_read == sizeof(*bnh))
957                 return 1;
958         else if (bytes_read == 0)
959                 return 0;
960         else
961                 return -1;
962 }
963
964 static int net_setup_addr(void)
965 {
966         struct sockaddr_in *addr = &hostname_addr;
967
968         memset(addr, 0, sizeof(*addr));
969         addr->sin_family = AF_INET;
970         addr->sin_port = htons(net_port);
971
972         if (inet_aton(hostname, &addr->sin_addr) != 1) {
973                 struct hostent *hent;
974 retry:
975                 hent = gethostbyname(hostname);
976                 if (!hent) {
977                         if (h_errno == TRY_AGAIN) {
978                                 usleep(100);
979                                 goto retry;
980                         } else if (h_errno == NO_RECOVERY) {
981                                 fprintf(stderr, "gethostbyname(%s)"
982                                         "non-recoverable error encountered\n",
983                                         hostname);
984                         } else {
985                                 /*
986                                  * HOST_NOT_FOUND, NO_ADDRESS or NO_DATA
987                                  */
988                                 fprintf(stderr, "Host %s not found\n",
989                                         hostname);
990                         }
991                         return 1;
992                 }
993
994                 memcpy(&addr->sin_addr, hent->h_addr, 4);
995                 memset(hostname, 0, sizeof(hostname));
996                 strncpy(hostname, hent->h_name, sizeof(hostname));
997                 hostname[sizeof(hostname) - 1] = '\0';
998         }
999
1000         return 0;
1001 }
1002
1003 static int net_setup_client(void)
1004 {
1005         int fd;
1006         struct sockaddr_in *addr = &hostname_addr;
1007
1008         fd = my_socket(AF_INET, SOCK_STREAM, 0);
1009         if (fd < 0) {
1010                 perror("client: socket");
1011                 return -1;
1012         }
1013
1014         if (connect(fd, (struct sockaddr *)addr, sizeof(*addr)) < 0) {
1015                 if (errno == ECONNREFUSED)
1016                         fprintf(stderr,
1017                                 "\nclient: Connection to %s refused, "
1018                                 "perhaps the server is not started?\n\n",
1019                                 hostname);
1020                 else
1021                         perror("client: connect");
1022
1023                 close(fd);
1024                 return -1;
1025         }
1026
1027         return fd;
1028 }
1029
1030 static int open_client_connections(void)
1031 {
1032         int cpu;
1033         size_t alloc_size = CPU_ALLOC_SIZE(max_cpus);
1034
1035         cl_fds = calloc(ncpus, sizeof(*cl_fds));
1036         for (cpu = 0; cpu < max_cpus; cpu++) {
1037                 if (!CPU_ISSET_S(cpu, alloc_size, online_cpus))
1038                         continue;
1039                 cl_fds[cpu] = net_setup_client();
1040                 if (cl_fds[cpu] < 0)
1041                         goto err;
1042         }
1043         return 0;
1044
1045 err:
1046         while (cpu > 0)
1047                 close(cl_fds[cpu--]);
1048         free(cl_fds);
1049         return 1;
1050 }
1051
1052 static void close_client_connections(void)
1053 {
1054         if (cl_fds) {
1055                 int cpu, *fdp;
1056                 size_t alloc_size = CPU_ALLOC_SIZE(max_cpus);
1057
1058                 for (cpu = 0, fdp = cl_fds; cpu < max_cpus; cpu++, fdp++) {
1059                         if (!CPU_ISSET_S(cpu, alloc_size, online_cpus))
1060                                 continue;
1061                         if (*fdp >= 0) {
1062                                 net_send_drops(*fdp);
1063                                 net_close_connection(fdp);
1064                         }
1065                 }
1066                 free(cl_fds);
1067         }
1068 }
1069
1070 static int setup_buts(void)
1071 {
1072         struct list_head *p;
1073         int ret = 0;
1074
1075         __list_for_each(p, &devpaths) {
1076                 struct blk_user_trace_setup buts;
1077                 struct devpath *dpp = list_entry(p, struct devpath, head);
1078
1079                 memset(&buts, 0, sizeof(buts));
1080                 buts.buf_size = buf_size;
1081                 buts.buf_nr = buf_nr;
1082                 buts.act_mask = act_mask;
1083
1084                 if (ioctl(dpp->fd, BLKTRACESETUP, &buts) >= 0) {
1085                         dpp->ncpus = max_cpus;
1086                         dpp->buts_name = strdup(buts.name);
1087                         dpp->setup_done = 1;
1088                         if (dpp->stats)
1089                                 free(dpp->stats);
1090                         dpp->stats = calloc(dpp->ncpus, sizeof(*dpp->stats));
1091                         memset(dpp->stats, 0, dpp->ncpus * sizeof(*dpp->stats));
1092                 } else {
1093                         fprintf(stderr, "BLKTRACESETUP(2) %s failed: %d/%s\n",
1094                                 dpp->path, errno, strerror(errno));
1095                         ret++;
1096                 }
1097         }
1098
1099         return ret;
1100 }
1101
1102 static void start_buts(void)
1103 {
1104         struct list_head *p;
1105
1106         __list_for_each(p, &devpaths) {
1107                 struct devpath *dpp = list_entry(p, struct devpath, head);
1108
1109                 if (ioctl(dpp->fd, BLKTRACESTART) < 0) {
1110                         fprintf(stderr, "BLKTRACESTART %s failed: %d/%s\n",
1111                                 dpp->path, errno, strerror(errno));
1112                 }
1113         }
1114 }
1115
1116 static int get_drops(struct devpath *dpp)
1117 {
1118         int fd, drops = 0;
1119         char fn[MAXPATHLEN + 64], tmp[256];
1120
1121         snprintf(fn, sizeof(fn), "%s/block/%s/dropped", debugfs_path,
1122                  dpp->buts_name);
1123
1124         fd = my_open(fn, O_RDONLY);
1125         if (fd < 0) {
1126                 /*
1127                  * This may be ok: the kernel may not support
1128                  * dropped counts.
1129                  */
1130                 if (errno != ENOENT)
1131                         fprintf(stderr, "Could not open %s: %d/%s\n",
1132                                 fn, errno, strerror(errno));
1133                 return 0;
1134         } else if (read(fd, tmp, sizeof(tmp)) < 0) {
1135                 fprintf(stderr, "Could not read %s: %d/%s\n",
1136                         fn, errno, strerror(errno));
1137         } else
1138                 drops = atoi(tmp);
1139         close(fd);
1140
1141         return drops;
1142 }
1143
1144 static void get_all_drops(void)
1145 {
1146         struct list_head *p;
1147
1148         __list_for_each(p, &devpaths) {
1149                 struct devpath *dpp = list_entry(p, struct devpath, head);
1150
1151                 dpp->drops = get_drops(dpp);
1152         }
1153 }
1154
1155 static inline struct trace_buf *alloc_trace_buf(int cpu, int bufsize)
1156 {
1157         struct trace_buf *tbp;
1158
1159         tbp = malloc(sizeof(*tbp) + bufsize);
1160         INIT_LIST_HEAD(&tbp->head);
1161         tbp->len = 0;
1162         tbp->buf = (void *)(tbp + 1);
1163         tbp->cpu = cpu;
1164         tbp->dpp = NULL;        /* Will be set when tbp is added */
1165
1166         return tbp;
1167 }
1168
1169 static void free_tracer_heads(struct devpath *dpp)
1170 {
1171         int cpu;
1172         struct tracer_devpath_head *hd;
1173
1174         for (cpu = 0, hd = dpp->heads; cpu < max_cpus; cpu++, hd++) {
1175                 if (hd->prev)
1176                         free(hd->prev);
1177
1178                 pthread_mutex_destroy(&hd->mutex);
1179         }
1180         free(dpp->heads);
1181 }
1182
1183 static int setup_tracer_devpaths(void)
1184 {
1185         struct list_head *p;
1186
1187         if (net_client_use_send())
1188                 if (open_client_connections())
1189                         return 1;
1190
1191         __list_for_each(p, &devpaths) {
1192                 int cpu;
1193                 struct tracer_devpath_head *hd;
1194                 struct devpath *dpp = list_entry(p, struct devpath, head);
1195
1196                 dpp->heads = calloc(max_cpus, sizeof(struct tracer_devpath_head));
1197                 for (cpu = 0, hd = dpp->heads; cpu < max_cpus; cpu++, hd++) {
1198                         INIT_LIST_HEAD(&hd->head);
1199                         pthread_mutex_init(&hd->mutex, NULL);
1200                         hd->prev = NULL;
1201                 }
1202         }
1203
1204         return 0;
1205 }
1206
1207 static inline void add_trace_buf(struct devpath *dpp, int cpu,
1208                                                 struct trace_buf **tbpp)
1209 {
1210         struct trace_buf *tbp = *tbpp;
1211         struct tracer_devpath_head *hd = &dpp->heads[cpu];
1212
1213         tbp->dpp = dpp;
1214
1215         pthread_mutex_lock(&hd->mutex);
1216         list_add_tail(&tbp->head, &hd->head);
1217         pthread_mutex_unlock(&hd->mutex);
1218
1219         *tbpp = alloc_trace_buf(cpu, buf_size);
1220 }
1221
1222 static inline void incr_entries(int entries_handled)
1223 {
1224         pthread_mutex_lock(&dp_mutex);
1225         if (dp_entries == 0)
1226                 pthread_cond_signal(&dp_cond);
1227         dp_entries += entries_handled;
1228         pthread_mutex_unlock(&dp_mutex);
1229 }
1230
1231 static void decr_entries(int handled)
1232 {
1233         pthread_mutex_lock(&dp_mutex);
1234         dp_entries -= handled;
1235         pthread_mutex_unlock(&dp_mutex);
1236 }
1237
1238 static int wait_empty_entries(void)
1239 {
1240         pthread_mutex_lock(&dp_mutex);
1241         while (!done && dp_entries == 0)
1242                 t_pthread_cond_wait(&dp_cond, &dp_mutex);
1243         pthread_mutex_unlock(&dp_mutex);
1244
1245         return !done;
1246 }
1247
1248 static int add_devpath(char *path)
1249 {
1250         int fd;
1251         struct devpath *dpp;
1252         struct list_head *p;
1253
1254         /*
1255          * Verify device is not duplicated
1256          */
1257         __list_for_each(p, &devpaths) {
1258                struct devpath *tmp = list_entry(p, struct devpath, head);
1259                if (!strcmp(tmp->path, path))
1260                         return 0;
1261         }
1262         /*
1263          * Verify device is valid before going too far
1264          */
1265         fd = my_open(path, O_RDONLY | O_NONBLOCK);
1266         if (fd < 0) {
1267                 fprintf(stderr, "Invalid path %s specified: %d/%s\n",
1268                         path, errno, strerror(errno));
1269                 return 1;
1270         }
1271
1272         dpp = malloc(sizeof(*dpp));
1273         memset(dpp, 0, sizeof(*dpp));
1274         dpp->path = strdup(path);
1275         dpp->fd = fd;
1276         ndevs++;
1277         list_add_tail(&dpp->head, &devpaths);
1278
1279         return 0;
1280 }
1281
1282 static void rel_devpaths(void)
1283 {
1284         struct list_head *p, *q;
1285
1286         list_for_each_safe(p, q, &devpaths) {
1287                 struct devpath *dpp = list_entry(p, struct devpath, head);
1288
1289                 list_del(&dpp->head);
1290                 if (dpp->setup_done)
1291                         __stop_trace(dpp->fd);
1292                 close(dpp->fd);
1293
1294                 if (dpp->heads)
1295                         free_tracer_heads(dpp);
1296
1297                 dpp_free(dpp);
1298                 ndevs--;
1299         }
1300 }
1301
1302 static int flush_subbuf_net(struct trace_buf *tbp)
1303 {
1304         int fd = cl_fds[tbp->cpu];
1305         struct devpath *dpp = tbp->dpp;
1306
1307         if (net_send_header(fd, tbp->cpu, dpp->buts_name, tbp->len))
1308                 return 1;
1309         else if (net_send_data(fd, tbp->buf, tbp->len) != tbp->len)
1310                 return 1;
1311
1312         return 0;
1313 }
1314
1315 static int
1316 handle_list_net(__attribute__((__unused__))struct tracer_devpath_head *hd,
1317                 struct list_head *list)
1318 {
1319         struct trace_buf *tbp;
1320         struct list_head *p, *q;
1321         int entries_handled = 0;
1322
1323         list_for_each_safe(p, q, list) {
1324                 tbp = list_entry(p, struct trace_buf, head);
1325
1326                 list_del(&tbp->head);
1327                 entries_handled++;
1328
1329                 if (cl_fds[tbp->cpu] >= 0) {
1330                         if (flush_subbuf_net(tbp)) {
1331                                 close(cl_fds[tbp->cpu]);
1332                                 cl_fds[tbp->cpu] = -1;
1333                         }
1334                 }
1335
1336                 free(tbp);
1337         }
1338
1339         return entries_handled;
1340 }
1341
1342 /*
1343  * Tack 'tbp's buf onto the tail of 'prev's buf
1344  */
1345 static struct trace_buf *tb_combine(struct trace_buf *prev,
1346                                     struct trace_buf *tbp)
1347 {
1348         unsigned long tot_len;
1349
1350         tot_len = prev->len + tbp->len;
1351         if (tot_len > buf_size) {
1352                 /*
1353                  * tbp->head isn't connected (it was 'prev'
1354                  * so it had been taken off of the list
1355                  * before). Therefore, we can realloc
1356                  * the whole structures, as the other fields
1357                  * are "static".
1358                  */
1359                 prev = realloc(prev, sizeof(*prev) + tot_len);
1360                 prev->buf = (void *)(prev + 1);
1361         }
1362
1363         memcpy(prev->buf + prev->len, tbp->buf, tbp->len);
1364         prev->len = tot_len;
1365
1366         free(tbp);
1367         return prev;
1368 }
1369
1370 static int handle_list_file(struct tracer_devpath_head *hd,
1371                             struct list_head *list)
1372 {
1373         int off, t_len, nevents;
1374         struct blk_io_trace *t;
1375         struct list_head *p, *q;
1376         int entries_handled = 0;
1377         struct trace_buf *tbp, *prev;
1378
1379         prev = hd->prev;
1380         list_for_each_safe(p, q, list) {
1381                 tbp = list_entry(p, struct trace_buf, head);
1382                 list_del(&tbp->head);
1383                 entries_handled++;
1384
1385                 /*
1386                  * If there was some leftover before, tack this new
1387                  * entry onto the tail of the previous one.
1388                  */
1389                 if (prev)
1390                         tbp = tb_combine(prev, tbp);
1391
1392                 /*
1393                  * See how many whole traces there are - send them
1394                  * all out in one go.
1395                  */
1396                 off = 0;
1397                 nevents = 0;
1398                 while (off + (int)sizeof(*t) <= tbp->len) {
1399                         t = (struct blk_io_trace *)(tbp->buf + off);
1400                         t_len = sizeof(*t) + t->pdu_len;
1401                         if (off + t_len > tbp->len)
1402                                 break;
1403
1404                         off += t_len;
1405                         nevents++;
1406                 }
1407                 if (nevents)
1408                         pdc_nev_update(tbp->dpp, tbp->cpu, nevents);
1409
1410                 /*
1411                  * Write any full set of traces, any remaining data is kept
1412                  * for the next pass.
1413                  */
1414                 if (off) {
1415                         if (write_data(tbp->buf, off) || off == tbp->len) {
1416                                 free(tbp);
1417                                 prev = NULL;
1418                         }
1419                         else {
1420                                 /*
1421                                  * Move valid data to beginning of buffer
1422                                  */
1423                                 tbp->len -= off;
1424                                 memmove(tbp->buf, tbp->buf + off, tbp->len);
1425                                 prev = tbp;
1426                         }
1427                 } else
1428                         prev = tbp;
1429         }
1430         hd->prev = prev;
1431
1432         return entries_handled;
1433 }
1434
1435 static void __process_trace_bufs(void)
1436 {
1437         int cpu;
1438         struct list_head *p;
1439         struct list_head list;
1440         int handled = 0;
1441
1442         __list_for_each(p, &devpaths) {
1443                 struct devpath *dpp = list_entry(p, struct devpath, head);
1444                 struct tracer_devpath_head *hd = dpp->heads;
1445
1446                 for (cpu = 0; cpu < max_cpus; cpu++, hd++) {
1447                         pthread_mutex_lock(&hd->mutex);
1448                         if (list_empty(&hd->head)) {
1449                                 pthread_mutex_unlock(&hd->mutex);
1450                                 continue;
1451                         }
1452
1453                         list_replace_init(&hd->head, &list);
1454                         pthread_mutex_unlock(&hd->mutex);
1455
1456                         handled += handle_list(hd, &list);
1457                 }
1458         }
1459
1460         if (handled)
1461                 decr_entries(handled);
1462 }
1463
1464 static void process_trace_bufs(void)
1465 {
1466         while (wait_empty_entries())
1467                 __process_trace_bufs();
1468 }
1469
1470 static void clean_trace_bufs(void)
1471 {
1472         /*
1473          * No mutex needed here: we're only reading from the lists,
1474          * tracers are done
1475          */
1476         while (dp_entries)
1477                 __process_trace_bufs();
1478 }
1479
1480 static inline void read_err(int cpu, char *ifn)
1481 {
1482         if (errno != EAGAIN)
1483                 fprintf(stderr, "Thread %d failed read of %s: %d/%s\n",
1484                         cpu, ifn, errno, strerror(errno));
1485 }
1486
1487 static int net_sendfile(struct io_info *iop)
1488 {
1489         int ret;
1490
1491         ret = sendfile(iop->ofd, iop->ifd, NULL, iop->ready);
1492         if (ret < 0) {
1493                 perror("sendfile");
1494                 return 1;
1495         } else if (ret < (int)iop->ready) {
1496                 fprintf(stderr, "short sendfile send (%d of %d)\n",
1497                         ret, iop->ready);
1498                 return 1;
1499         }
1500
1501         return 0;
1502 }
1503
1504 static inline int net_sendfile_data(struct tracer *tp, struct io_info *iop)
1505 {
1506         struct devpath *dpp = iop->dpp;
1507
1508         if (net_send_header(iop->ofd, tp->cpu, dpp->buts_name, iop->ready))
1509                 return 1;
1510         return net_sendfile(iop);
1511 }
1512
1513 static int fill_ofname(char *dst, int dstlen, char *subdir, char *buts_name,
1514                        int cpu)
1515 {
1516         int len;
1517         struct stat sb;
1518
1519         if (output_dir)
1520                 len = snprintf(dst, dstlen, "%s/", output_dir);
1521         else
1522                 len = snprintf(dst, dstlen, "./");
1523
1524         if (subdir)
1525                 len += snprintf(dst + len, dstlen - len, "%s", subdir);
1526
1527         if (stat(dst, &sb) < 0) {
1528                 if (errno != ENOENT) {
1529                         fprintf(stderr,
1530                                 "Destination dir %s stat failed: %d/%s\n",
1531                                 dst, errno, strerror(errno));
1532                         return 1;
1533                 }
1534                 /*
1535                  * There is no synchronization between multiple threads
1536                  * trying to create the directory at once.  It's harmless
1537                  * to let them try, so just detect the problem and move on.
1538                  */
1539                 if (mkdir(dst, 0755) < 0 && errno != EEXIST) {
1540                         fprintf(stderr,
1541                                 "Destination dir %s can't be made: %d/%s\n",
1542                                 dst, errno, strerror(errno));
1543                         return 1;
1544                 }
1545         }
1546
1547         if (output_name)
1548                 snprintf(dst + len, dstlen - len, "%s.blktrace.%d",
1549                          output_name, cpu);
1550         else
1551                 snprintf(dst + len, dstlen - len, "%s.blktrace.%d",
1552                          buts_name, cpu);
1553
1554         return 0;
1555 }
1556
1557 static int set_vbuf(struct io_info *iop, int mode, size_t size)
1558 {
1559         iop->obuf = malloc(size);
1560         if (setvbuf(iop->ofp, iop->obuf, mode, size) < 0) {
1561                 fprintf(stderr, "setvbuf(%s, %d) failed: %d/%s\n",
1562                         iop->dpp->path, (int)size, errno,
1563                         strerror(errno));
1564                 free(iop->obuf);
1565                 return 1;
1566         }
1567
1568         return 0;
1569 }
1570
1571 static int iop_open(struct io_info *iop, int cpu)
1572 {
1573         char hostdir[MAXPATHLEN + 64];
1574
1575         iop->ofd = -1;
1576         if (net_mode == Net_server) {
1577                 struct cl_conn *nc = iop->nc;
1578                 int len;
1579
1580                 len = snprintf(hostdir, sizeof(hostdir), "%s-",
1581                                nc->ch->hostname);
1582                 len += strftime(hostdir + len, sizeof(hostdir) - len, "%F-%T/",
1583                                 gmtime(&iop->dpp->cl_connect_time));
1584         } else {
1585                 hostdir[0] = 0;
1586         }
1587
1588         if (fill_ofname(iop->ofn, sizeof(iop->ofn), hostdir,
1589                         iop->dpp->buts_name, cpu))
1590                 return 1;
1591
1592         iop->ofp = my_fopen(iop->ofn, "w+");
1593         if (iop->ofp == NULL) {
1594                 fprintf(stderr, "Open output file %s failed: %d/%s\n",
1595                         iop->ofn, errno, strerror(errno));
1596                 return 1;
1597         }
1598
1599         if (set_vbuf(iop, _IOLBF, FILE_VBUF_SIZE)) {
1600                 fprintf(stderr, "set_vbuf for file %s failed: %d/%s\n",
1601                         iop->ofn, errno, strerror(errno));
1602                 fclose(iop->ofp);
1603                 return 1;
1604         }
1605
1606         iop->ofd = fileno(iop->ofp);
1607         return 0;
1608 }
1609
1610 static void close_iop(struct io_info *iop)
1611 {
1612         struct mmap_info *mip = &iop->mmap_info;
1613
1614         if (mip->fs_buf)
1615                 munmap(mip->fs_buf, mip->fs_buf_len);
1616
1617         if (!piped_output) {
1618                 if (ftruncate(fileno(iop->ofp), mip->fs_size) < 0) {
1619                         fprintf(stderr,
1620                                 "Ignoring err: ftruncate(%s): %d/%s\n",
1621                                 iop->ofn, errno, strerror(errno));
1622                 }
1623         }
1624
1625         if (iop->ofp)
1626                 fclose(iop->ofp);
1627         if (iop->obuf)
1628                 free(iop->obuf);
1629 }
1630
1631 static void close_ios(struct tracer *tp)
1632 {
1633         while (tp->nios > 0) {
1634                 struct io_info *iop = &tp->ios[--tp->nios];
1635
1636                 iop->dpp->drops = get_drops(iop->dpp);
1637                 if (iop->ifd >= 0)
1638                         close(iop->ifd);
1639
1640                 if (iop->ofp)
1641                         close_iop(iop);
1642                 else if (iop->ofd >= 0) {
1643                         struct devpath *dpp = iop->dpp;
1644
1645                         net_send_close(iop->ofd, dpp->buts_name, dpp->drops);
1646                         net_close_connection(&iop->ofd);
1647                 }
1648         }
1649
1650         free(tp->ios);
1651         free(tp->pfds);
1652 }
1653
1654 static int open_ios(struct tracer *tp)
1655 {
1656         struct pollfd *pfd;
1657         struct io_info *iop;
1658         struct list_head *p;
1659
1660         tp->ios = calloc(ndevs, sizeof(struct io_info));
1661         memset(tp->ios, 0, ndevs * sizeof(struct io_info));
1662
1663         tp->pfds = calloc(ndevs, sizeof(struct pollfd));
1664         memset(tp->pfds, 0, ndevs * sizeof(struct pollfd));
1665
1666         tp->nios = 0;
1667         iop = tp->ios;
1668         pfd = tp->pfds;
1669         __list_for_each(p, &devpaths) {
1670                 struct devpath *dpp = list_entry(p, struct devpath, head);
1671
1672                 iop->dpp = dpp;
1673                 iop->ofd = -1;
1674                 snprintf(iop->ifn, sizeof(iop->ifn), "%s/block/%s/trace%d",
1675                         debugfs_path, dpp->buts_name, tp->cpu);
1676
1677                 iop->ifd = my_open(iop->ifn, O_RDONLY | O_NONBLOCK);
1678                 if (iop->ifd < 0) {
1679                         fprintf(stderr, "Thread %d failed open %s: %d/%s\n",
1680                                 tp->cpu, iop->ifn, errno, strerror(errno));
1681                         return 1;
1682                 }
1683
1684                 init_mmap_info(&iop->mmap_info);
1685
1686                 pfd->fd = iop->ifd;
1687                 pfd->events = POLLIN;
1688
1689                 if (piped_output)
1690                         ;
1691                 else if (net_client_use_sendfile()) {
1692                         iop->ofd = net_setup_client();
1693                         if (iop->ofd < 0)
1694                                 goto err;
1695                         net_send_open(iop->ofd, tp->cpu, dpp->buts_name);
1696                 } else if (net_mode == Net_none) {
1697                         if (iop_open(iop, tp->cpu))
1698                                 goto err;
1699                 } else {
1700                         /*
1701                          * This ensures that the server knows about all
1702                          * connections & devices before _any_ closes
1703                          */
1704                         net_send_open(cl_fds[tp->cpu], tp->cpu, dpp->buts_name);
1705                 }
1706
1707                 pfd++;
1708                 iop++;
1709                 tp->nios++;
1710         }
1711
1712         return 0;
1713
1714 err:
1715         close(iop->ifd);        /* tp->nios _not_ bumped */
1716         close_ios(tp);
1717         return 1;
1718 }
1719
1720 static int handle_pfds_file(struct tracer *tp, int nevs, int force_read)
1721 {
1722         struct mmap_info *mip;
1723         int i, ret, nentries = 0;
1724         struct pollfd *pfd = tp->pfds;
1725         struct io_info *iop = tp->ios;
1726
1727         for (i = 0; nevs > 0 && i < ndevs; i++, pfd++, iop++) {
1728                 if (pfd->revents & POLLIN || force_read) {
1729                         mip = &iop->mmap_info;
1730
1731                         ret = setup_mmap(iop->ofd, buf_size, mip, tp);
1732                         if (ret < 0) {
1733                                 pfd->events = 0;
1734                                 break;
1735                         }
1736
1737                         ret = read(iop->ifd, mip->fs_buf + mip->fs_off,
1738                                    buf_size);
1739                         if (ret > 0) {
1740                                 pdc_dr_update(iop->dpp, tp->cpu, ret);
1741                                 mip->fs_size += ret;
1742                                 mip->fs_off += ret;
1743                                 nentries++;
1744                         } else if (ret == 0) {
1745                                 /*
1746                                  * Short reads after we're done stop us
1747                                  * from trying reads.
1748                                  */
1749                                 if (tp->is_done)
1750                                         clear_events(pfd);
1751                         } else {
1752                                 read_err(tp->cpu, iop->ifn);
1753                                 if (errno != EAGAIN || tp->is_done)
1754                                         clear_events(pfd);
1755                         }
1756                         nevs--;
1757                 }
1758         }
1759
1760         return nentries;
1761 }
1762
1763 static int handle_pfds_netclient(struct tracer *tp, int nevs, int force_read)
1764 {
1765         struct stat sb;
1766         int i, nentries = 0;
1767         struct pollfd *pfd = tp->pfds;
1768         struct io_info *iop = tp->ios;
1769
1770         for (i = 0; i < ndevs; i++, pfd++, iop++) {
1771                 if (pfd->revents & POLLIN || force_read) {
1772                         if (fstat(iop->ifd, &sb) < 0) {
1773                                 perror(iop->ifn);
1774                                 pfd->events = 0;
1775                         } else if (sb.st_size > (off_t)iop->data_queued) {
1776                                 iop->ready = sb.st_size - iop->data_queued;
1777                                 iop->data_queued = sb.st_size;
1778
1779                                 if (!net_sendfile_data(tp, iop)) {
1780                                         pdc_dr_update(iop->dpp, tp->cpu,
1781                                                       iop->ready);
1782                                         nentries++;
1783                                 } else
1784                                         clear_events(pfd);
1785                         }
1786                         if (--nevs == 0)
1787                                 break;
1788                 }
1789         }
1790
1791         if (nentries)
1792                 incr_entries(nentries);
1793
1794         return nentries;
1795 }
1796
1797 static int handle_pfds_entries(struct tracer *tp, int nevs, int force_read)
1798 {
1799         int i, nentries = 0;
1800         struct trace_buf *tbp;
1801         struct pollfd *pfd = tp->pfds;
1802         struct io_info *iop = tp->ios;
1803
1804         tbp = alloc_trace_buf(tp->cpu, buf_size);
1805         for (i = 0; i < ndevs; i++, pfd++, iop++) {
1806                 if (pfd->revents & POLLIN || force_read) {
1807                         tbp->len = read(iop->ifd, tbp->buf, buf_size);
1808                         if (tbp->len > 0) {
1809                                 pdc_dr_update(iop->dpp, tp->cpu, tbp->len);
1810                                 add_trace_buf(iop->dpp, tp->cpu, &tbp);
1811                                 nentries++;
1812                         } else if (tbp->len == 0) {
1813                                 /*
1814                                  * Short reads after we're done stop us
1815                                  * from trying reads.
1816                                  */
1817                                 if (tp->is_done)
1818                                         clear_events(pfd);
1819                         } else {
1820                                 read_err(tp->cpu, iop->ifn);
1821                                 if (errno != EAGAIN || tp->is_done)
1822                                         clear_events(pfd);
1823                         }
1824                         if (!piped_output && --nevs == 0)
1825                                 break;
1826                 }
1827         }
1828         free(tbp);
1829
1830         if (nentries)
1831                 incr_entries(nentries);
1832
1833         return nentries;
1834 }
1835
1836 static void *thread_main(void *arg)
1837 {
1838         int ret, ndone, to_val;
1839         struct tracer *tp = arg;
1840
1841         ret = lock_on_cpu(tp->cpu);
1842         if (ret)
1843                 goto err;
1844
1845         ret = open_ios(tp);
1846         if (ret)
1847                 goto err;
1848
1849         if (piped_output)
1850                 to_val = 50;            /* Frequent partial handles */
1851         else
1852                 to_val = 500;           /* 1/2 second intervals */
1853
1854
1855         tracer_signal_ready(tp, Th_running, 0);
1856         tracer_wait_unblock(tp);
1857
1858         while (!tp->is_done) {
1859                 ndone = poll(tp->pfds, ndevs, to_val);
1860                 if (ndone || piped_output)
1861                         (void)handle_pfds(tp, ndone, piped_output);
1862                 else if (ndone < 0 && errno != EINTR)
1863                         fprintf(stderr, "Thread %d poll failed: %d/%s\n",
1864                                 tp->cpu, errno, strerror(errno));
1865         }
1866
1867         /*
1868          * Trace is stopped, pull data until we get a short read
1869          */
1870         while (handle_pfds(tp, ndevs, 1) > 0)
1871                 ;
1872
1873         close_ios(tp);
1874         tracer_signal_ready(tp, Th_leaving, 0);
1875         return NULL;
1876
1877 err:
1878         tracer_signal_ready(tp, Th_error, ret);
1879         return NULL;
1880 }
1881
1882 static int start_tracer(int cpu)
1883 {
1884         struct tracer *tp;
1885
1886         tp = malloc(sizeof(*tp));
1887         memset(tp, 0, sizeof(*tp));
1888
1889         INIT_LIST_HEAD(&tp->head);
1890         tp->status = 0;
1891         tp->cpu = cpu;
1892
1893         if (pthread_create(&tp->thread, NULL, thread_main, tp)) {
1894                 fprintf(stderr, "FAILED to start thread on CPU %d: %d/%s\n",
1895                         cpu, errno, strerror(errno));
1896                 free(tp);
1897                 return 1;
1898         }
1899
1900         list_add_tail(&tp->head, &tracers);
1901         return 0;
1902 }
1903
1904 static int create_output_files(int cpu)
1905 {
1906         char fname[MAXPATHLEN + 64];
1907         struct list_head *p;
1908         FILE *f;
1909
1910         __list_for_each(p, &devpaths) {
1911                 struct devpath *dpp = list_entry(p, struct devpath, head);
1912
1913                 if (fill_ofname(fname, sizeof(fname), NULL, dpp->buts_name,
1914                                 cpu))
1915                         return 1;
1916                 f = my_fopen(fname, "w+");
1917                 if (!f)
1918                         return 1;
1919                 fclose(f);
1920         }
1921         return 0;
1922 }
1923
1924 static void start_tracers(void)
1925 {
1926         int cpu, started = 0;
1927         struct list_head *p;
1928         size_t alloc_size = CPU_ALLOC_SIZE(max_cpus);
1929
1930         for (cpu = 0; cpu < max_cpus; cpu++) {
1931                 if (!CPU_ISSET_S(cpu, alloc_size, online_cpus)) {
1932                         /*
1933                          * Create fake empty output files so that other tools
1934                          * like blkparse don't have to bother with sparse CPU
1935                          * number space.
1936                          */
1937                         if (create_output_files(cpu))
1938                                 break;
1939                         continue;
1940                 }
1941                 if (start_tracer(cpu))
1942                         break;
1943                 started++;
1944         }
1945
1946         wait_tracers_ready(started);
1947
1948         __list_for_each(p, &tracers) {
1949                 struct tracer *tp = list_entry(p, struct tracer, head);
1950                 if (tp->status)
1951                         fprintf(stderr,
1952                                 "FAILED to start thread on CPU %d: %d/%s\n",
1953                                 tp->cpu, tp->status, strerror(tp->status));
1954         }
1955 }
1956
1957 static void stop_tracers(void)
1958 {
1959         struct list_head *p;
1960
1961         /*
1962          * Stop the tracing - makes the tracer threads clean up quicker.
1963          */
1964         __list_for_each(p, &devpaths) {
1965                 struct devpath *dpp = list_entry(p, struct devpath, head);
1966                 (void)ioctl(dpp->fd, BLKTRACESTOP);
1967         }
1968
1969         /*
1970          * Tell each tracer to quit
1971          */
1972         __list_for_each(p, &tracers) {
1973                 struct tracer *tp = list_entry(p, struct tracer, head);
1974                 tp->is_done = 1;
1975         }
1976         pthread_cond_broadcast(&mt_cond);
1977 }
1978
1979 static void del_tracers(void)
1980 {
1981         struct list_head *p, *q;
1982
1983         list_for_each_safe(p, q, &tracers) {
1984                 struct tracer *tp = list_entry(p, struct tracer, head);
1985
1986                 list_del(&tp->head);
1987                 free(tp);
1988         }
1989 }
1990
1991 static void wait_tracers(void)
1992 {
1993         struct list_head *p;
1994
1995         if (use_tracer_devpaths())
1996                 process_trace_bufs();
1997
1998         wait_tracers_leaving();
1999
2000         __list_for_each(p, &tracers) {
2001                 int ret;
2002                 struct tracer *tp = list_entry(p, struct tracer, head);
2003
2004                 ret = pthread_join(tp->thread, NULL);
2005                 if (ret)
2006                         fprintf(stderr, "Thread join %d failed %d\n",
2007                                 tp->cpu, ret);
2008         }
2009
2010         if (use_tracer_devpaths())
2011                 clean_trace_bufs();
2012
2013         get_all_drops();
2014 }
2015
2016 static void exit_tracing(void)
2017 {
2018         signal(SIGINT, SIG_IGN);
2019         signal(SIGHUP, SIG_IGN);
2020         signal(SIGTERM, SIG_IGN);
2021         signal(SIGALRM, SIG_IGN);
2022
2023         stop_tracers();
2024         wait_tracers();
2025         del_tracers();
2026         rel_devpaths();
2027 }
2028
2029 static void handle_sigint(__attribute__((__unused__)) int sig)
2030 {
2031         done = 1;
2032         stop_tracers();
2033 }
2034
2035 static void show_stats(struct list_head *devpaths)
2036 {
2037         FILE *ofp;
2038         struct list_head *p;
2039         unsigned long long nevents, data_read;
2040         unsigned long long total_drops = 0;
2041         unsigned long long total_events = 0;
2042
2043         if (piped_output)
2044                 ofp = my_fopen("/dev/null", "w");
2045         else
2046                 ofp = stdout;
2047
2048         __list_for_each(p, devpaths) {
2049                 int cpu;
2050                 struct pdc_stats *sp;
2051                 struct devpath *dpp = list_entry(p, struct devpath, head);
2052
2053                 if (net_mode == Net_server)
2054                         printf("server: end of run for %s:%s\n",
2055                                 dpp->ch->hostname, dpp->buts_name);
2056
2057                 data_read = 0;
2058                 nevents = 0;
2059
2060                 fprintf(ofp, "=== %s ===\n", dpp->buts_name);
2061                 for (cpu = 0, sp = dpp->stats; cpu < dpp->ncpus; cpu++, sp++) {
2062                         /*
2063                          * Estimate events if not known...
2064                          */
2065                         if (sp->nevents == 0) {
2066                                 sp->nevents = sp->data_read /
2067                                                 sizeof(struct blk_io_trace);
2068                         }
2069
2070                         fprintf(ofp,
2071                                 "  CPU%3d: %20llu events, %8llu KiB data\n",
2072                                 cpu, sp->nevents, (sp->data_read + 1023) >> 10);
2073
2074                         data_read += sp->data_read;
2075                         nevents += sp->nevents;
2076                 }
2077
2078                 fprintf(ofp, "  Total:  %20llu events (dropped %llu),"
2079                              " %8llu KiB data\n", nevents,
2080                              dpp->drops, (data_read + 1024) >> 10);
2081
2082                 total_drops += dpp->drops;
2083                 total_events += (nevents + dpp->drops);
2084         }
2085
2086         fflush(ofp);
2087         if (piped_output)
2088                 fclose(ofp);
2089
2090         if (total_drops) {
2091                 double drops_ratio = 1.0;
2092
2093                 if (total_events)
2094                         drops_ratio = (double)total_drops/(double)total_events;
2095
2096                 fprintf(stderr, "\nYou have %llu (%5.1lf%%) dropped events\n"
2097                                 "Consider using a larger buffer size (-b) "
2098                                 "and/or more buffers (-n)\n",
2099                         total_drops, 100.0 * drops_ratio);
2100         }
2101 }
2102
2103 static int handle_args(int argc, char *argv[])
2104 {
2105         int c, i;
2106         struct statfs st;
2107         int act_mask_tmp = 0;
2108
2109         while ((c = getopt_long(argc, argv, S_OPTS, l_opts, NULL)) >= 0) {
2110                 switch (c) {
2111                 case 'a':
2112                         i = find_mask_map(optarg);
2113                         if (i < 0) {
2114                                 fprintf(stderr, "Invalid action mask %s\n",
2115                                         optarg);
2116                                 return 1;
2117                         }
2118                         act_mask_tmp |= i;
2119                         break;
2120
2121                 case 'A':
2122                         if ((sscanf(optarg, "%x", &i) != 1) ||
2123                                                         !valid_act_opt(i)) {
2124                                 fprintf(stderr,
2125                                         "Invalid set action mask %s/0x%x\n",
2126                                         optarg, i);
2127                                 return 1;
2128                         }
2129                         act_mask_tmp = i;
2130                         break;
2131
2132                 case 'd':
2133                         if (add_devpath(optarg) != 0)
2134                                 return 1;
2135                         break;
2136
2137                 case 'I': {
2138                         char dev_line[256];
2139                         FILE *ifp = my_fopen(optarg, "r");
2140
2141                         if (!ifp) {
2142                                 fprintf(stderr,
2143                                         "Invalid file for devices %s\n",
2144                                         optarg);
2145                                 return 1;
2146                         }
2147
2148                         while (fscanf(ifp, "%s\n", dev_line) == 1) {
2149                                 if (add_devpath(dev_line) != 0) {
2150                                         fclose(ifp);
2151                                         return 1;
2152                                 }
2153                         }
2154                         fclose(ifp);
2155                         break;
2156                 }
2157
2158                 case 'r':
2159                         debugfs_path = optarg;
2160                         break;
2161
2162                 case 'o':
2163                         output_name = optarg;
2164                         break;
2165                 case 'k':
2166                         kill_running_trace = 1;
2167                         break;
2168                 case 'w':
2169                         stop_watch = atoi(optarg);
2170                         if (stop_watch <= 0) {
2171                                 fprintf(stderr,
2172                                         "Invalid stopwatch value (%d secs)\n",
2173                                         stop_watch);
2174                                 return 1;
2175                         }
2176                         break;
2177                 case 'V':
2178                 case 'v':
2179                         printf("%s version %s\n", argv[0], blktrace_version);
2180                         exit(0);
2181                         /*NOTREACHED*/
2182                 case 'b':
2183                         buf_size = strtoul(optarg, NULL, 10);
2184                         if (buf_size <= 0 || buf_size > 16*1024) {
2185                                 fprintf(stderr, "Invalid buffer size (%lu)\n",
2186                                         buf_size);
2187                                 return 1;
2188                         }
2189                         buf_size <<= 10;
2190                         break;
2191                 case 'n':
2192                         buf_nr = strtoul(optarg, NULL, 10);
2193                         if (buf_nr <= 0) {
2194                                 fprintf(stderr,
2195                                         "Invalid buffer nr (%lu)\n", buf_nr);
2196                                 return 1;
2197                         }
2198                         break;
2199                 case 'D':
2200                         output_dir = optarg;
2201                         break;
2202                 case 'h':
2203                         net_mode = Net_client;
2204                         memset(hostname, 0, sizeof(hostname));
2205                         strncpy(hostname, optarg, sizeof(hostname));
2206                         hostname[sizeof(hostname) - 1] = '\0';
2207                         break;
2208                 case 'l':
2209                         net_mode = Net_server;
2210                         break;
2211                 case 'p':
2212                         net_port = atoi(optarg);
2213                         break;
2214                 case 's':
2215                         net_use_sendfile = 0;
2216                         break;
2217                 default:
2218                         show_usage(argv[0]);
2219                         exit(1);
2220                         /*NOTREACHED*/
2221                 }
2222         }
2223
2224         while (optind < argc)
2225                 if (add_devpath(argv[optind++]) != 0)
2226                         return 1;
2227
2228         if (net_mode != Net_server && ndevs == 0) {
2229                 show_usage(argv[0]);
2230                 return 1;
2231         }
2232
2233         if (statfs(debugfs_path, &st) < 0) {
2234                 fprintf(stderr, "Invalid debug path %s: %d/%s\n",
2235                         debugfs_path, errno, strerror(errno));
2236                 return 1;
2237         }
2238
2239         if (st.f_type != (long)DEBUGFS_TYPE) {
2240                 fprintf(stderr, "Debugfs is not mounted at %s\n", debugfs_path);
2241                 return 1;
2242         }
2243
2244         if (act_mask_tmp != 0)
2245                 act_mask = act_mask_tmp;
2246
2247         if (net_mode == Net_client && net_setup_addr())
2248                 return 1;
2249
2250         /*
2251          * Set up for appropriate PFD handler based upon output name.
2252          */
2253         if (net_client_use_sendfile())
2254                 handle_pfds = handle_pfds_netclient;
2255         else if (net_client_use_send())
2256                 handle_pfds = handle_pfds_entries;
2257         else if (output_name && (strcmp(output_name, "-") == 0)) {
2258                 piped_output = 1;
2259                 handle_pfds = handle_pfds_entries;
2260                 pfp = stdout;
2261                 if (setvbuf(pfp, NULL, _IONBF, 0)) {
2262                         perror("setvbuf stdout");
2263                         return 1;
2264                 }
2265         } else
2266                 handle_pfds = handle_pfds_file;
2267         return 0;
2268 }
2269
2270 static void ch_add_connection(struct net_server_s *ns, struct cl_host *ch,
2271                               int fd)
2272 {
2273         struct cl_conn *nc;
2274
2275         nc = malloc(sizeof(*nc));
2276         memset(nc, 0, sizeof(*nc));
2277
2278         time(&nc->connect_time);
2279         nc->ch = ch;
2280         nc->fd = fd;
2281         nc->ncpus = -1;
2282
2283         list_add_tail(&nc->ch_head, &ch->conn_list);
2284         ch->connects++;
2285
2286         list_add_tail(&nc->ns_head, &ns->conn_list);
2287         ns->connects++;
2288         ns->pfds = realloc(ns->pfds, (ns->connects+1) * sizeof(struct pollfd));
2289 }
2290
2291 static void ch_rem_connection(struct net_server_s *ns, struct cl_host *ch,
2292                               struct cl_conn *nc)
2293 {
2294         net_close_connection(&nc->fd);
2295
2296         list_del(&nc->ch_head);
2297         ch->connects--;
2298
2299         list_del(&nc->ns_head);
2300         ns->connects--;
2301         ns->pfds = realloc(ns->pfds, (ns->connects+1) * sizeof(struct pollfd));
2302
2303         free(nc);
2304 }
2305
2306 static struct cl_host *net_find_client_host(struct net_server_s *ns,
2307                                             struct in_addr cl_in_addr)
2308 {
2309         struct list_head *p;
2310
2311         __list_for_each(p, &ns->ch_list) {
2312                 struct cl_host *ch = list_entry(p, struct cl_host, head);
2313
2314                 if (in_addr_eq(ch->cl_in_addr, cl_in_addr))
2315                         return ch;
2316         }
2317
2318         return NULL;
2319 }
2320
2321 static struct cl_host *net_add_client_host(struct net_server_s *ns,
2322                                            struct sockaddr_in *addr)
2323 {
2324         struct cl_host *ch;
2325
2326         ch = malloc(sizeof(*ch));
2327         memset(ch, 0, sizeof(*ch));
2328
2329         ch->ns = ns;
2330         ch->cl_in_addr = addr->sin_addr;
2331         list_add_tail(&ch->head, &ns->ch_list);
2332         ns->nchs++;
2333
2334         ch->hostname = strdup(inet_ntoa(addr->sin_addr));
2335         printf("server: connection from %s\n", ch->hostname);
2336
2337         INIT_LIST_HEAD(&ch->conn_list);
2338         INIT_LIST_HEAD(&ch->devpaths);
2339
2340         return ch;
2341 }
2342
2343 static void device_done(struct devpath *dpp, int ncpus)
2344 {
2345         int cpu;
2346         struct io_info *iop;
2347
2348         for (cpu = 0, iop = dpp->ios; cpu < ncpus; cpu++, iop++)
2349                 close_iop(iop);
2350
2351         list_del(&dpp->head);
2352         dpp_free(dpp);
2353 }
2354
2355 static void net_ch_remove(struct cl_host *ch, int ncpus)
2356 {
2357         struct list_head *p, *q;
2358         struct net_server_s *ns = ch->ns;
2359
2360         list_for_each_safe(p, q, &ch->devpaths) {
2361                 struct devpath *dpp = list_entry(p, struct devpath, head);
2362                 device_done(dpp, ncpus);
2363         }
2364
2365         list_for_each_safe(p, q, &ch->conn_list) {
2366                 struct cl_conn *nc = list_entry(p, struct cl_conn, ch_head);
2367
2368                 ch_rem_connection(ns, ch, nc);
2369         }
2370
2371         list_del(&ch->head);
2372         ns->nchs--;
2373
2374         if (ch->hostname)
2375                 free(ch->hostname);
2376         free(ch);
2377 }
2378
2379 static void net_add_connection(struct net_server_s *ns)
2380 {
2381         int fd;
2382         struct cl_host *ch;
2383         socklen_t socklen = sizeof(ns->addr);
2384
2385         fd = my_accept(ns->listen_fd, (struct sockaddr *)&ns->addr, &socklen);
2386         if (fd < 0) {
2387                 /*
2388                  * This is OK: we just won't accept this connection,
2389                  * nothing fatal.
2390                  */
2391                 perror("accept");
2392         } else {
2393                 ch = net_find_client_host(ns, ns->addr.sin_addr);
2394                 if (!ch)
2395                         ch = net_add_client_host(ns, &ns->addr);
2396
2397                 ch_add_connection(ns, ch, fd);
2398         }
2399 }
2400
2401 static struct devpath *nc_add_dpp(struct cl_conn *nc,
2402                                   struct blktrace_net_hdr *bnh,
2403                                   time_t connect_time)
2404 {
2405         int cpu;
2406         struct io_info *iop;
2407         struct devpath *dpp;
2408
2409         dpp = malloc(sizeof(*dpp));
2410         memset(dpp, 0, sizeof(*dpp));
2411
2412         dpp->buts_name = strdup(bnh->buts_name);
2413         dpp->path = strdup(bnh->buts_name);
2414         dpp->fd = -1;
2415         dpp->ch = nc->ch;
2416         dpp->cl_id = bnh->cl_id;
2417         dpp->cl_connect_time = connect_time;
2418         dpp->ncpus = nc->ncpus;
2419         dpp->stats = calloc(dpp->ncpus, sizeof(*dpp->stats));
2420         memset(dpp->stats, 0, dpp->ncpus * sizeof(*dpp->stats));
2421
2422         list_add_tail(&dpp->head, &nc->ch->devpaths);
2423         nc->ch->ndevs++;
2424
2425         dpp->ios = calloc(nc->ncpus, sizeof(*iop));
2426         memset(dpp->ios, 0, ndevs * sizeof(*iop));
2427
2428         for (cpu = 0, iop = dpp->ios; cpu < nc->ncpus; cpu++, iop++) {
2429                 iop->dpp = dpp;
2430                 iop->nc = nc;
2431                 init_mmap_info(&iop->mmap_info);
2432
2433                 if (iop_open(iop, cpu))
2434                         goto err;
2435         }
2436
2437         return dpp;
2438
2439 err:
2440         /*
2441          * Need to unravel what's been done...
2442          */
2443         while (cpu >= 0)
2444                 close_iop(&dpp->ios[cpu--]);
2445         dpp_free(dpp);
2446
2447         return NULL;
2448 }
2449
2450 static struct devpath *nc_find_dpp(struct cl_conn *nc,
2451                                    struct blktrace_net_hdr *bnh)
2452 {
2453         struct list_head *p;
2454         time_t connect_time = nc->connect_time;
2455
2456         __list_for_each(p, &nc->ch->devpaths) {
2457                 struct devpath *dpp = list_entry(p, struct devpath, head);
2458
2459                 if (!strcmp(dpp->buts_name, bnh->buts_name))
2460                         return dpp;
2461
2462                 if (dpp->cl_id == bnh->cl_id)
2463                         connect_time = dpp->cl_connect_time;
2464         }
2465
2466         return nc_add_dpp(nc, bnh, connect_time);
2467 }
2468
2469 static void net_client_read_data(struct cl_conn *nc, struct devpath *dpp,
2470                                  struct blktrace_net_hdr *bnh)
2471 {
2472         int ret;
2473         struct io_info *iop = &dpp->ios[bnh->cpu];
2474         struct mmap_info *mip = &iop->mmap_info;
2475
2476         if (setup_mmap(iop->ofd, bnh->len, &iop->mmap_info, NULL)) {
2477                 fprintf(stderr, "ncd(%s:%d): mmap failed\n",
2478                         nc->ch->hostname, nc->fd);
2479                 exit(1);
2480         }
2481
2482         ret = net_recv_data(nc->fd, mip->fs_buf + mip->fs_off, bnh->len);
2483         if (ret > 0) {
2484                 pdc_dr_update(dpp, bnh->cpu, ret);
2485                 mip->fs_size += ret;
2486                 mip->fs_off += ret;
2487         } else if (ret < 0)
2488                 exit(1);
2489 }
2490
2491 /*
2492  * Returns 1 if we closed a host - invalidates other polling information
2493  * that may be present.
2494  */
2495 static int net_client_data(struct cl_conn *nc)
2496 {
2497         int ret;
2498         struct devpath *dpp;
2499         struct blktrace_net_hdr bnh;
2500
2501         ret = net_get_header(nc, &bnh);
2502         if (ret == 0)
2503                 return 0;
2504
2505         if (ret < 0) {
2506                 fprintf(stderr, "ncd(%d): header read failed\n", nc->fd);
2507                 exit(1);
2508         }
2509
2510         if (data_is_native == -1 && check_data_endianness(bnh.magic)) {
2511                 fprintf(stderr, "ncd(%d): received data is bad\n", nc->fd);
2512                 exit(1);
2513         }
2514
2515         if (!data_is_native) {
2516                 bnh.magic = be32_to_cpu(bnh.magic);
2517                 bnh.cpu = be32_to_cpu(bnh.cpu);
2518                 bnh.max_cpus = be32_to_cpu(bnh.max_cpus);
2519                 bnh.len = be32_to_cpu(bnh.len);
2520                 bnh.cl_id = be32_to_cpu(bnh.cl_id);
2521                 bnh.buf_size = be32_to_cpu(bnh.buf_size);
2522                 bnh.buf_nr = be32_to_cpu(bnh.buf_nr);
2523                 bnh.page_size = be32_to_cpu(bnh.page_size);
2524         }
2525
2526         if ((bnh.magic & 0xffffff00) != BLK_IO_TRACE_MAGIC) {
2527                 fprintf(stderr, "ncd(%s:%d): bad data magic\n",
2528                         nc->ch->hostname, nc->fd);
2529                 exit(1);
2530         }
2531
2532         if (nc->ncpus == -1)
2533                 nc->ncpus = bnh.max_cpus;
2534
2535         /*
2536          * len == 0 means the other end is sending us a new connection/dpp
2537          * len == 1 means that the other end signalled end-of-run
2538          */
2539         dpp = nc_find_dpp(nc, &bnh);
2540         if (bnh.len == 0) {
2541                 /*
2542                  * Just adding in the dpp above is enough
2543                  */
2544                 ack_open_close(nc->fd, dpp->buts_name);
2545                 nc->ch->cl_opens++;
2546         } else if (bnh.len == 1) {
2547                 /*
2548                  * overload cpu count with dropped events
2549                  */
2550                 dpp->drops = bnh.cpu;
2551
2552                 ack_open_close(nc->fd, dpp->buts_name);
2553                 if (--nc->ch->cl_opens == 0) {
2554                         show_stats(&nc->ch->devpaths);
2555                         net_ch_remove(nc->ch, nc->ncpus);
2556                         return 1;
2557                 }
2558         } else
2559                 net_client_read_data(nc, dpp, &bnh);
2560
2561         return 0;
2562 }
2563
2564 static void handle_client_data(struct net_server_s *ns, int events)
2565 {
2566         struct cl_conn *nc;
2567         struct pollfd *pfd;
2568         struct list_head *p, *q;
2569
2570         pfd = &ns->pfds[1];
2571         list_for_each_safe(p, q, &ns->conn_list) {
2572                 if (pfd->revents & POLLIN) {
2573                         nc = list_entry(p, struct cl_conn, ns_head);
2574
2575                         if (net_client_data(nc) || --events == 0)
2576                                 break;
2577                 }
2578                 pfd++;
2579         }
2580 }
2581
2582 static void net_setup_pfds(struct net_server_s *ns)
2583 {
2584         struct pollfd *pfd;
2585         struct list_head *p;
2586
2587         ns->pfds[0].fd = ns->listen_fd;
2588         ns->pfds[0].events = POLLIN;
2589
2590         pfd = &ns->pfds[1];
2591         __list_for_each(p, &ns->conn_list) {
2592                 struct cl_conn *nc = list_entry(p, struct cl_conn, ns_head);
2593
2594                 pfd->fd = nc->fd;
2595                 pfd->events = POLLIN;
2596                 pfd++;
2597         }
2598 }
2599
2600 static int net_server_handle_connections(struct net_server_s *ns)
2601 {
2602         int events;
2603
2604         printf("server: waiting for connections...\n");
2605
2606         while (!done) {
2607                 net_setup_pfds(ns);
2608                 events = poll(ns->pfds, ns->connects + 1, -1);
2609                 if (events < 0) {
2610                         if (errno != EINTR) {
2611                                 perror("FATAL: poll error");
2612                                 return 1;
2613                         }
2614                 } else if (events > 0) {
2615                         if (ns->pfds[0].revents & POLLIN) {
2616                                 net_add_connection(ns);
2617                                 events--;
2618                         }
2619
2620                         if (events)
2621                                 handle_client_data(ns, events);
2622                 }
2623         }
2624
2625         return 0;
2626 }
2627
2628 static int net_server(void)
2629 {
2630         int fd, opt;
2631         int ret = 1;
2632         struct net_server_s net_server;
2633         struct net_server_s *ns = &net_server;
2634
2635         memset(ns, 0, sizeof(*ns));
2636         INIT_LIST_HEAD(&ns->ch_list);
2637         INIT_LIST_HEAD(&ns->conn_list);
2638         ns->pfds = malloc(sizeof(struct pollfd));
2639
2640         fd = my_socket(AF_INET, SOCK_STREAM, 0);
2641         if (fd < 0) {
2642                 perror("server: socket");
2643                 goto out;
2644         }
2645
2646         opt = 1;
2647         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
2648                 perror("setsockopt");
2649                 goto out;
2650         }
2651
2652         memset(&ns->addr, 0, sizeof(ns->addr));
2653         ns->addr.sin_family = AF_INET;
2654         ns->addr.sin_addr.s_addr = htonl(INADDR_ANY);
2655         ns->addr.sin_port = htons(net_port);
2656
2657         if (bind(fd, (struct sockaddr *) &ns->addr, sizeof(ns->addr)) < 0) {
2658                 perror("bind");
2659                 goto out;
2660         }
2661
2662         if (listen(fd, 1) < 0) {
2663                 perror("listen");
2664                 goto out;
2665         }
2666
2667         /*
2668          * The actual server looping is done here:
2669          */
2670         ns->listen_fd = fd;
2671         ret = net_server_handle_connections(ns);
2672
2673         /*
2674          * Clean up and return...
2675          */
2676 out:
2677         free(ns->pfds);
2678         return ret;
2679 }
2680
2681 static int run_tracers(void)
2682 {
2683         atexit(exit_tracing);
2684         if (net_mode == Net_client)
2685                 printf("blktrace: connecting to %s\n", hostname);
2686
2687         if (setup_buts())
2688                 return 1;
2689
2690         if (use_tracer_devpaths()) {
2691                 if (setup_tracer_devpaths())
2692                         return 1;
2693
2694                 if (piped_output)
2695                         handle_list = handle_list_file;
2696                 else
2697                         handle_list = handle_list_net;
2698         }
2699
2700         start_tracers();
2701         if (nthreads_running == ncpus) {
2702                 unblock_tracers();
2703                 start_buts();
2704                 if (net_mode == Net_client)
2705                         printf("blktrace: connected!\n");
2706                 if (stop_watch)
2707                         alarm(stop_watch);
2708         } else
2709                 stop_tracers();
2710
2711         wait_tracers();
2712         if (nthreads_running == ncpus)
2713                 show_stats(&devpaths);
2714         if (net_client_use_send())
2715                 close_client_connections();
2716         del_tracers();
2717
2718         return 0;
2719 }
2720
2721 static cpu_set_t *get_online_cpus(void)
2722 {
2723         FILE *cpus;
2724         cpu_set_t *set;
2725         size_t alloc_size;
2726         int cpuid, prevcpuid = -1;
2727         char nextch;
2728         int n, ncpu, curcpu = 0;
2729         int *cpu_nums;
2730
2731         ncpu = sysconf(_SC_NPROCESSORS_CONF);
2732         if (ncpu < 0)
2733                 return NULL;
2734
2735         cpu_nums = malloc(sizeof(int)*ncpu);
2736         if (!cpu_nums) {
2737                 errno = ENOMEM;
2738                 return NULL;
2739         }
2740
2741         /*
2742          * There is no way to easily get maximum CPU number. So we have to
2743          * parse the file first to find it out and then create appropriate
2744          * cpuset
2745          */
2746         cpus = my_fopen("/sys/devices/system/cpu/online", "r");
2747         for (;;) {
2748                 n = fscanf(cpus, "%d%c", &cpuid, &nextch);
2749                 if (n <= 0)
2750                         break;
2751                 if (n == 2 && nextch == '-') {
2752                         prevcpuid = cpuid;
2753                         continue;
2754                 }
2755                 if (prevcpuid == -1)
2756                         prevcpuid = cpuid;
2757                 while (prevcpuid <= cpuid) {
2758                         /* More CPUs listed than configured? */
2759                         if (curcpu >= ncpu) {
2760                                 errno = EINVAL;
2761                                 return NULL;
2762                         }
2763                         cpu_nums[curcpu++] = prevcpuid++;
2764                 }
2765                 prevcpuid = -1;
2766         }
2767         fclose(cpus);
2768
2769         ncpu = curcpu;
2770         max_cpus = cpu_nums[ncpu - 1] + 1;
2771
2772         /* Now that we have maximum cpu number, create a cpuset */
2773         set = CPU_ALLOC(max_cpus);
2774         if (!set) {
2775                 errno = ENOMEM;
2776                 return NULL;
2777         }
2778         alloc_size = CPU_ALLOC_SIZE(max_cpus);
2779         CPU_ZERO_S(alloc_size, set);
2780
2781         for (curcpu = 0; curcpu < ncpu; curcpu++)
2782                 CPU_SET_S(cpu_nums[curcpu], alloc_size, set);
2783
2784         free(cpu_nums);
2785
2786         return set;
2787 }
2788
2789 int main(int argc, char *argv[])
2790 {
2791         int ret = 0;
2792
2793         setlocale(LC_NUMERIC, "en_US");
2794         pagesize = getpagesize();
2795         online_cpus = get_online_cpus();
2796         if (!online_cpus) {
2797                 fprintf(stderr, "cannot get online cpus %d/%s\n",
2798                         errno, strerror(errno));
2799                 ret = 1;
2800                 goto out;
2801         } else if (handle_args(argc, argv)) {
2802                 ret = 1;
2803                 goto out;
2804         }
2805
2806         ncpus = CPU_COUNT_S(CPU_ALLOC_SIZE(max_cpus), online_cpus);
2807         if (ndevs > 1 && output_name && strcmp(output_name, "-") != 0) {
2808                 fprintf(stderr, "-o not supported with multiple devices\n");
2809                 ret = 1;
2810                 goto out;
2811         }
2812
2813         signal(SIGINT, handle_sigint);
2814         signal(SIGHUP, handle_sigint);
2815         signal(SIGTERM, handle_sigint);
2816         signal(SIGALRM, handle_sigint);
2817         signal(SIGPIPE, SIG_IGN);
2818
2819         if (kill_running_trace) {
2820                 struct devpath *dpp;
2821                 struct list_head *p;
2822
2823                 __list_for_each(p, &devpaths) {
2824                         dpp = list_entry(p, struct devpath, head);
2825                         if (__stop_trace(dpp->fd)) {
2826                                 fprintf(stderr,
2827                                         "BLKTRACETEARDOWN %s failed: %d/%s\n",
2828                                         dpp->path, errno, strerror(errno));
2829                         }
2830                 }
2831         } else if (net_mode == Net_server) {
2832                 if (output_name) {
2833                         fprintf(stderr, "-o ignored in server mode\n");
2834                         output_name = NULL;
2835                 }
2836                 ret = net_server();
2837         } else
2838                 ret = run_tracers();
2839
2840 out:
2841         if (pfp)
2842                 fclose(pfp);
2843         rel_devpaths();
2844         return ret;
2845 }