[PATCH] Ignore -o (output_name) when in server mode
[blktrace.git] / blktrace.c
1 /*
2  * block queue tracing application
3  *
4  * Copyright (C) 2005 Jens Axboe <axboe@suse.de>
5  * Copyright (C) 2006 Jens Axboe <axboe@kernel.dk>
6  *
7  *  This program is free software; you can redistribute it and/or modify
8  *  it under the terms of the GNU General Public License as published by
9  *  the Free Software Foundation; either version 2 of the License, or
10  *  (at your option) any later version.
11  *
12  *  This program is distributed in the hope that it will be useful,
13  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
14  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
15  *  GNU General Public License for more details.
16  *
17  *  You should have received a copy of the GNU General Public License
18  *  along with this program; if not, write to the Free Software
19  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
20  *
21  */
22 #include <pthread.h>
23 #include <sys/types.h>
24 #include <sys/stat.h>
25 #include <unistd.h>
26 #include <locale.h>
27 #include <signal.h>
28 #include <fcntl.h>
29 #include <string.h>
30 #include <sys/ioctl.h>
31 #include <sys/param.h>
32 #include <sys/statfs.h>
33 #include <sys/poll.h>
34 #include <sys/mman.h>
35 #include <sys/socket.h>
36 #include <stdio.h>
37 #include <stdlib.h>
38 #include <sched.h>
39 #include <ctype.h>
40 #include <getopt.h>
41 #include <errno.h>
42 #include <netinet/in.h>
43 #include <arpa/inet.h>
44 #include <netdb.h>
45 #include <sys/sendfile.h>
46
47 #include "blktrace.h"
48 #include "barrier.h"
49
50 static char blktrace_version[] = "0.99.1";
51
52 /*
53  * You may want to increase this even more, if you are logging at a high
54  * rate and see skipped/missed events
55  */
56 #define BUF_SIZE        (512 * 1024)
57 #define BUF_NR          (4)
58
59 #define OFILE_BUF       (128 * 1024)
60
61 #define DEBUGFS_TYPE    0x64626720
62
63 #define S_OPTS  "d:a:A:r:o:kw:Vb:n:D:lh:p:s"
64 static struct option l_opts[] = {
65         {
66                 .name = "dev",
67                 .has_arg = required_argument,
68                 .flag = NULL,
69                 .val = 'd'
70         },
71         {
72                 .name = "act-mask",
73                 .has_arg = required_argument,
74                 .flag = NULL,
75                 .val = 'a'
76         },
77         {
78                 .name = "set-mask",
79                 .has_arg = required_argument,
80                 .flag = NULL,
81                 .val = 'A'
82         },
83         {
84                 .name = "relay",
85                 .has_arg = required_argument,
86                 .flag = NULL,
87                 .val = 'r'
88         },
89         {
90                 .name = "output",
91                 .has_arg = required_argument,
92                 .flag = NULL,
93                 .val = 'o'
94         },
95         {
96                 .name = "kill",
97                 .has_arg = no_argument,
98                 .flag = NULL,
99                 .val = 'k'
100         },
101         {
102                 .name = "stopwatch",
103                 .has_arg = required_argument,
104                 .flag = NULL,
105                 .val = 'w'
106         },
107         {
108                 .name = "version",
109                 .has_arg = no_argument,
110                 .flag = NULL,
111                 .val = 'V'
112         },
113         {
114                 .name = "buffer-size",
115                 .has_arg = required_argument,
116                 .flag = NULL,
117                 .val = 'b'
118         },
119         {
120                 .name = "num-sub-buffers",
121                 .has_arg = required_argument,
122                 .flag = NULL,
123                 .val = 'n'
124         },
125         {
126                 .name = "output-dir",
127                 .has_arg = required_argument,
128                 .flag = NULL,
129                 .val = 'D'
130         },
131         {
132                 .name = "listen",
133                 .has_arg = no_argument,
134                 .flag = NULL,
135                 .val = 'l'
136         },
137         {
138                 .name = "host",
139                 .has_arg = required_argument,
140                 .flag = NULL,
141                 .val = 'h'
142         },
143         {
144                 .name = "port",
145                 .has_arg = required_argument,
146                 .flag = NULL,
147                 .val = 'p'
148         },
149         {
150                 .name = "no-sendfile",
151                 .has_arg = no_argument,
152                 .flag = NULL,
153                 .val = 's'
154         },
155         {
156                 .name = NULL,
157         }
158 };
159
160 struct tip_subbuf {
161         void *buf;
162         unsigned int len;
163         unsigned int max_len;
164 };
165
166 #define FIFO_SIZE       (1024)  /* should be plenty big! */
167 #define CL_SIZE         (128)   /* cache line, any bigger? */
168
169 struct tip_subbuf_fifo {
170         int tail __attribute__((aligned(CL_SIZE)));
171         int head __attribute__((aligned(CL_SIZE)));
172         struct tip_subbuf *q[FIFO_SIZE];
173 };
174
175 struct thread_information {
176         int cpu;
177         pthread_t thread;
178
179         int fd;
180         void *fd_buf;
181         char fn[MAXPATHLEN + 64];
182
183         FILE *ofile;
184         char *ofile_buffer;
185         off_t ofile_offset;
186         int ofile_stdout;
187         int ofile_mmap;
188
189         int (*get_subbuf)(struct thread_information *, unsigned int);
190         int (*flush_subbuf)(struct thread_information *, struct tip_subbuf *);
191         int (*read_data)(struct thread_information *, void *, unsigned int);
192
193         unsigned long events_processed;
194         unsigned long long data_read;
195         unsigned long long data_queued;
196         struct device_information *device;
197
198         int exited;
199
200         /*
201          * piped fifo buffers
202          */
203         struct tip_subbuf_fifo fifo;
204         struct tip_subbuf *leftover_ts;
205
206         /*
207          * mmap controlled output files
208          */
209         unsigned long long fs_size;
210         unsigned long long fs_max_size;
211         unsigned long fs_off;
212         void *fs_buf;
213         unsigned long fs_buf_len;
214
215         struct net_connection *nc;
216 };
217
218 struct device_information {
219         int fd;
220         char *path;
221         char buts_name[32];
222         volatile int trace_started;
223         unsigned long drop_count;
224         struct thread_information *threads;
225         unsigned long buf_size;
226         unsigned long buf_nr;
227         unsigned int page_size;
228
229         struct cl_host *ch;
230         u32 cl_id;
231         time_t cl_connect_time;
232 };
233
234 static int ncpus;
235 static struct thread_information *thread_information;
236 static int ndevs;
237 static struct device_information *device_information;
238
239 /* command line option globals */
240 static char *debugfs_path;
241 static char *output_name;
242 static char *output_dir;
243 static int act_mask = ~0U;
244 static int kill_running_trace;
245 static unsigned long buf_size = BUF_SIZE;
246 static unsigned long buf_nr = BUF_NR;
247 static unsigned int page_size;
248
249 #define is_done()       (*(volatile int *)(&done))
250 static volatile int done;
251
252 #define is_trace_stopped()      (*(volatile int *)(&trace_stopped))
253 static volatile int trace_stopped;
254
255 #define is_stat_shown() (*(volatile int *)(&stat_shown))
256 static volatile int stat_shown;
257
258 int data_is_native = -1;
259
260 static void exit_trace(int status);
261
262 #define dip_tracing(dip)        (*(volatile int *)(&(dip)->trace_started))
263 #define dip_set_tracing(dip, v) ((dip)->trace_started = (v))
264
265 #define __for_each_dip(__d, __di, __e, __i)     \
266         for (__i = 0, __d = __di; __i < __e; __i++, __d++)
267
268 #define for_each_dip(__d, __i)          \
269         __for_each_dip(__d, device_information, ndevs, __i)
270 #define for_each_nc_dip(__nc, __d, __i)         \
271         __for_each_dip(__d, (__nc)->ch->device_information, (__nc)->ch->ndevs, __i)
272
273 #define __for_each_tip(__d, __t, __ncpus, __j)  \
274         for (__j = 0, __t = (__d)->threads; __j < __ncpus; __j++, __t++)
275 #define for_each_tip(__d, __t, __j)     \
276         __for_each_tip(__d, __t, ncpus, __j)
277 #define for_each_cl_host(__c)   \
278         for (__c = cl_host_list; __c; __c = __c->list_next)
279
280 /*
281  * networking stuff follows. we include a magic number so we know whether
282  * to endianness convert or not
283  */
284 struct blktrace_net_hdr {
285         u32 magic;              /* same as trace magic */
286         char buts_name[32];     /* trace name */
287         u32 cpu;                /* for which cpu */
288         u32 max_cpus;
289         u32 len;                /* length of following trace data */
290         u32 cl_id;              /* id for set of client per-cpu connections */
291         u32 buf_size;           /* client buf_size for this trace  */
292         u32 buf_nr;             /* client buf_nr for this trace  */
293         u32 page_size;          /* client page_size for this trace  */
294 };
295
296 #define TRACE_NET_PORT          (8462)
297
298 enum {
299         Net_none = 0,
300         Net_server,
301         Net_client,
302 };
303
304 /*
305  * network cmd line params
306  */
307 static char hostname[MAXHOSTNAMELEN];
308 static int net_port = TRACE_NET_PORT;
309 static int net_mode = 0;
310 static int net_use_sendfile = 1;
311
312 struct cl_host {
313         struct cl_host *list_next;
314         struct in_addr cl_in_addr;
315         struct net_connection *net_connections;
316         int nconn;
317         struct device_information *device_information;
318         int ndevs;
319         int ncpus;
320         int ndevs_done;
321 };
322
323 struct net_connection {
324         int in_fd;
325         struct pollfd pfd;
326         time_t connect_time;
327         struct cl_host *ch;
328         int ncpus;
329 };
330
331 #define NET_MAX_CL_HOSTS        (1024)
332 static struct cl_host *cl_host_list;
333 static int cl_hosts;
334 static int net_connects;
335
336 static int *net_out_fd;
337
338 static void handle_sigint(__attribute__((__unused__)) int sig)
339 {
340         struct device_information *dip;
341         int i;
342
343         /*
344          * stop trace so we can reap currently produced data
345          */
346         for_each_dip(dip, i) {
347                 if (dip->fd == -1)
348                         continue;
349                 if (ioctl(dip->fd, BLKTRACESTOP) < 0)
350                         perror("BLKTRACESTOP");
351         }
352
353         done = 1;
354 }
355
356 static int get_dropped_count(const char *buts_name)
357 {
358         int fd;
359         char tmp[MAXPATHLEN + 64];
360
361         snprintf(tmp, sizeof(tmp), "%s/block/%s/dropped",
362                  debugfs_path, buts_name);
363
364         fd = open(tmp, O_RDONLY);
365         if (fd < 0) {
366                 /*
367                  * this may be ok, if the kernel doesn't support dropped counts
368                  */
369                 if (errno == ENOENT)
370                         return 0;
371
372                 fprintf(stderr, "Couldn't open dropped file %s\n", tmp);
373                 return -1;
374         }
375
376         if (read(fd, tmp, sizeof(tmp)) < 0) {
377                 perror(tmp);
378                 close(fd);
379                 return -1;
380         }
381
382         close(fd);
383
384         return atoi(tmp);
385 }
386
387 static int start_trace(struct device_information *dip)
388 {
389         struct blk_user_trace_setup buts;
390
391         memset(&buts, 0, sizeof(buts));
392         buts.buf_size = dip->buf_size;
393         buts.buf_nr = dip->buf_nr;
394         buts.act_mask = act_mask;
395
396         if (ioctl(dip->fd, BLKTRACESETUP, &buts) < 0) {
397                 perror("BLKTRACESETUP");
398                 return 1;
399         }
400
401         if (ioctl(dip->fd, BLKTRACESTART) < 0) {
402                 perror("BLKTRACESTART");
403                 return 1;
404         }
405
406         memcpy(dip->buts_name, buts.name, sizeof(dip->buts_name));
407         dip_set_tracing(dip, 1);
408         return 0;
409 }
410
411 static void stop_trace(struct device_information *dip)
412 {
413         if (dip_tracing(dip) || kill_running_trace) {
414                 dip_set_tracing(dip, 0);
415
416                 /*
417                  * should be stopped, just don't complain if it isn't
418                  */
419                 ioctl(dip->fd, BLKTRACESTOP);
420
421                 if (ioctl(dip->fd, BLKTRACETEARDOWN) < 0)
422                         perror("BLKTRACETEARDOWN");
423
424                 close(dip->fd);
425                 dip->fd = -1;
426         }
427 }
428
429 static void stop_all_traces(void)
430 {
431         struct device_information *dip;
432         int i;
433
434         for_each_dip(dip, i) {
435                 dip->drop_count = get_dropped_count(dip->buts_name);
436                 stop_trace(dip);
437         }
438 }
439
440 static void wait_for_data(struct thread_information *tip, int timeout)
441 {
442         struct pollfd pfd = { .fd = tip->fd, .events = POLLIN };
443
444         while (!is_done()) {
445                 if (poll(&pfd, 1, timeout) < 0) {
446                         perror("poll");
447                         break;
448                 }
449                 if (pfd.revents & POLLIN)
450                         break;
451                 if (tip->ofile_stdout)
452                         break;
453         }
454 }
455
456 static int read_data_file(struct thread_information *tip, void *buf,
457                           unsigned int len)
458 {
459         int ret = 0;
460
461         do {
462                 wait_for_data(tip, 100);
463
464                 ret = read(tip->fd, buf, len);
465                 if (!ret)
466                         continue;
467                 else if (ret > 0)
468                         return ret;
469                 else {
470                         if (errno != EAGAIN) {
471                                 perror(tip->fn);
472                                 fprintf(stderr,"Thread %d failed read of %s\n",
473                                         tip->cpu, tip->fn);
474                                 break;
475                         }
476                         continue;
477                 }
478         } while (!is_done());
479
480         return ret;
481
482 }
483
484 static int read_data_net(struct thread_information *tip, void *buf,
485                          unsigned int len)
486 {
487         struct net_connection *nc = tip->nc;
488         unsigned int bytes_left = len;
489         int ret = 0;
490
491         do {
492                 ret = recv(nc->in_fd, buf, bytes_left, MSG_WAITALL);
493
494                 if (!ret)
495                         continue;
496                 else if (ret < 0) {
497                         if (errno != EAGAIN) {
498                                 perror(tip->fn);
499                                 fprintf(stderr, "server: failed read\n");
500                                 return 0;
501                         }
502                         continue;
503                 } else {
504                         buf += ret;
505                         bytes_left -= ret;
506                 }
507         } while (!is_done() && bytes_left);
508
509         return len - bytes_left;
510 }
511
512 static inline struct tip_subbuf *
513 subbuf_fifo_dequeue(struct thread_information *tip)
514 {
515         const int head = tip->fifo.head;
516         const int next = (head + 1) & (FIFO_SIZE - 1);
517
518         if (head != tip->fifo.tail) {
519                 struct tip_subbuf *ts = tip->fifo.q[head];
520
521                 store_barrier();
522                 tip->fifo.head = next;
523                 return ts;
524         }
525
526         return NULL;
527 }
528
529 static inline int subbuf_fifo_queue(struct thread_information *tip,
530                                     struct tip_subbuf *ts)
531 {
532         const int tail = tip->fifo.tail;
533         const int next = (tail + 1) & (FIFO_SIZE - 1);
534
535         if (next != tip->fifo.head) {
536                 tip->fifo.q[tail] = ts;
537                 store_barrier();
538                 tip->fifo.tail = next;
539                 return 0;
540         }
541
542         fprintf(stderr, "fifo too small!\n");
543         return 1;
544 }
545
546 /*
547  * For file output, truncate and mmap the file appropriately
548  */
549 static int mmap_subbuf(struct thread_information *tip, unsigned int maxlen)
550 {
551         int ofd = fileno(tip->ofile);
552         int ret;
553         unsigned long nr;
554
555         /*
556          * extend file, if we have to. use chunks of 16 subbuffers.
557          */
558         if (tip->fs_off + maxlen > tip->fs_buf_len) {
559                 if (tip->fs_buf) {
560                         munlock(tip->fs_buf, tip->fs_buf_len);
561                         munmap(tip->fs_buf, tip->fs_buf_len);
562                         tip->fs_buf = NULL;
563                 }
564
565                 tip->fs_off = tip->fs_size & (tip->device->page_size - 1);
566                 nr = max(16, tip->device->buf_nr);
567                 tip->fs_buf_len = (nr * tip->device->buf_size) - tip->fs_off;
568                 tip->fs_max_size += tip->fs_buf_len;
569
570                 if (ftruncate(ofd, tip->fs_max_size) < 0) {
571                         perror("ftruncate");
572                         return -1;
573                 }
574
575                 tip->fs_buf = mmap(NULL, tip->fs_buf_len, PROT_WRITE,
576                                    MAP_SHARED, ofd, tip->fs_size - tip->fs_off);
577                 if (tip->fs_buf == MAP_FAILED) {
578                         perror("mmap");
579                         return -1;
580                 }
581                 mlock(tip->fs_buf, tip->fs_buf_len);
582         }
583
584         ret = tip->read_data(tip, tip->fs_buf + tip->fs_off, maxlen);
585         if (ret >= 0) {
586                 tip->data_read += ret;
587                 tip->fs_size += ret;
588                 tip->fs_off += ret;
589                 return 0;
590         }
591
592         return -1;
593 }
594
595 /*
596  * Use the copy approach for pipes and network
597  */
598 static int get_subbuf(struct thread_information *tip, unsigned int maxlen)
599 {
600         struct tip_subbuf *ts = malloc(sizeof(*ts));
601         int ret;
602
603         ts->buf = malloc(tip->device->buf_size);
604         ts->max_len = maxlen;
605
606         ret = tip->read_data(tip, ts->buf, ts->max_len);
607         if (ret > 0) {
608                 ts->len = ret;
609                 tip->data_read += ret;
610                 if (subbuf_fifo_queue(tip, ts))
611                         ret = -1;
612         }
613
614         if (ret <= 0) {
615                 free(ts->buf);
616                 free(ts);
617         }
618
619         return ret;
620 }
621
622 static void close_thread(struct thread_information *tip)
623 {
624         if (tip->fd != -1)
625                 close(tip->fd);
626         if (tip->ofile)
627                 fclose(tip->ofile);
628         if (tip->ofile_buffer)
629                 free(tip->ofile_buffer);
630         if (tip->fd_buf)
631                 free(tip->fd_buf);
632
633         tip->fd = -1;
634         tip->ofile = NULL;
635         tip->ofile_buffer = NULL;
636         tip->fd_buf = NULL;
637 }
638
639 static void tip_ftrunc_final(struct thread_information *tip)
640 {
641         /*
642          * truncate to right size and cleanup mmap
643          */
644         if (tip->ofile_mmap && tip->ofile) {
645                 int ofd = fileno(tip->ofile);
646
647                 if (tip->fs_buf)
648                         munmap(tip->fs_buf, tip->fs_buf_len);
649
650                 ftruncate(ofd, tip->fs_size);
651         }
652 }
653
654 static void *thread_main(void *arg)
655 {
656         struct thread_information *tip = arg;
657         pid_t pid = getpid();
658         cpu_set_t cpu_mask;
659
660         CPU_ZERO(&cpu_mask);
661         CPU_SET((tip->cpu), &cpu_mask);
662
663         if (sched_setaffinity(pid, sizeof(cpu_mask), &cpu_mask) == -1) {
664                 perror("sched_setaffinity");
665                 exit_trace(1);
666         }
667
668         snprintf(tip->fn, sizeof(tip->fn), "%s/block/%s/trace%d",
669                         debugfs_path, tip->device->buts_name, tip->cpu);
670         tip->fd = open(tip->fn, O_RDONLY);
671         if (tip->fd < 0) {
672                 perror(tip->fn);
673                 fprintf(stderr,"Thread %d failed open of %s\n", tip->cpu,
674                         tip->fn);
675                 exit_trace(1);
676         }
677
678         while (!is_done()) {
679                 if (tip->get_subbuf(tip, tip->device->buf_size) < 0)
680                         break;
681         }
682
683         /*
684          * trace is stopped, pull data until we get a short read
685          */
686         while (tip->get_subbuf(tip, tip->device->buf_size) > 0)
687                 ;
688
689         tip_ftrunc_final(tip);
690         tip->exited = 1;
691         return NULL;
692 }
693
694 static int write_data_net(int fd, void *buf, unsigned int buf_len)
695 {
696         unsigned int bytes_left = buf_len;
697         int ret;
698
699         while (bytes_left) {
700                 ret = send(fd, buf, bytes_left, 0);
701                 if (ret < 0) {
702                         perror("send");
703                         return 1;
704                 }
705
706                 buf += ret;
707                 bytes_left -= ret;
708         }
709
710         return 0;
711 }
712
713 static int net_send_header(struct thread_information *tip, unsigned int len)
714 {
715         struct blktrace_net_hdr hdr;
716
717         hdr.magic = BLK_IO_TRACE_MAGIC;
718         strcpy(hdr.buts_name, tip->device->buts_name);
719         hdr.cpu = tip->cpu;
720         hdr.max_cpus = ncpus;
721         hdr.len = len;
722         hdr.cl_id = getpid();
723         hdr.buf_size = tip->device->buf_size;
724         hdr.buf_nr = tip->device->buf_nr;
725         hdr.page_size = tip->device->page_size;
726         
727         return write_data_net(net_out_fd[tip->cpu], &hdr, sizeof(hdr));
728 }
729
730 /*
731  * send header with 0 length to signal end-of-run
732  */
733 static void net_client_send_close(void)
734 {
735         struct device_information *dip;
736         struct blktrace_net_hdr hdr;
737         int i;
738
739         for_each_dip(dip, i) {
740                 hdr.magic = BLK_IO_TRACE_MAGIC;
741                 hdr.max_cpus = ncpus;
742                 hdr.len = 0;
743                 strcpy(hdr.buts_name, dip->buts_name);
744                 hdr.cpu = get_dropped_count(dip->buts_name);
745                 hdr.cl_id = getpid();
746                 hdr.buf_size = dip->buf_size;
747                 hdr.buf_nr = dip->buf_nr;
748                 hdr.page_size = dip->page_size;
749
750                 write_data_net(net_out_fd[0], &hdr, sizeof(hdr));
751         }
752
753 }
754
755 static int flush_subbuf_net(struct thread_information *tip,
756                             struct tip_subbuf *ts)
757 {
758         if (net_send_header(tip, ts->len))
759                 return -1;
760         if (write_data_net(net_out_fd[tip->cpu], ts->buf, ts->len))
761                 return -1;
762
763         free(ts->buf);
764         free(ts);
765         return 1;
766 }
767
768 static int net_sendfile(struct thread_information *tip, struct tip_subbuf *ts)
769 {
770         int ret = sendfile(net_out_fd[tip->cpu], tip->fd, NULL, ts->len);
771
772         if (ret < 0) {
773                 perror("sendfile");
774                 return 1;
775         } else if (ret < (int) ts->len) {
776                 fprintf(stderr, "short sendfile send (%d of %d)\n", ret, ts->len);
777                 return 1;
778         }
779
780         return 0;
781 }
782
783 static int flush_subbuf_sendfile(struct thread_information *tip,
784                                  struct tip_subbuf *ts)
785 {
786         int ret = -1;
787
788         if (net_send_header(tip, ts->len))
789                 goto err;
790         if (net_sendfile(tip, ts))
791                 goto err;
792
793         tip->data_read += ts->len;
794         ret = 1;
795 err:
796         free(ts);
797         return ret;
798 }
799
800 static int get_subbuf_sendfile(struct thread_information *tip,
801                                __attribute__((__unused__)) unsigned int maxlen)
802 {
803         struct tip_subbuf *ts;
804         struct stat sb;
805         unsigned int ready;
806
807         wait_for_data(tip, -1);
808
809         if (fstat(tip->fd, &sb) < 0) {
810                 perror("trace stat");
811                 return -1;
812         }
813
814         ready = sb.st_size - tip->data_queued;
815         if (!ready) {
816                 usleep(1000);
817                 return 0;
818         }
819
820         ts = malloc(sizeof(*ts));
821         ts->buf = NULL;
822         ts->max_len = 0;
823         ts->len = ready;
824         tip->data_queued += ready;
825
826         if (flush_subbuf_sendfile(tip, ts) < 0)
827                 return -1;
828
829         return ready;
830 }
831
832 static int write_data(struct thread_information *tip, void *buf,
833                       unsigned int buf_len)
834 {
835         int ret;
836
837         if (!buf_len)
838                 return 0;
839
840         ret = fwrite(buf, buf_len, 1, tip->ofile);
841         if (ferror(tip->ofile) || ret != 1) {
842                 perror("fwrite");
843                 clearerr(tip->ofile);
844                 return 1;
845         }
846
847         if (tip->ofile_stdout)
848                 fflush(tip->ofile);
849
850         return 0;
851 }
852
853 static int flush_subbuf_file(struct thread_information *tip,
854                              struct tip_subbuf *ts)
855 {
856         unsigned int offset = 0;
857         struct blk_io_trace *t;
858         int pdu_len, events = 0;
859
860         /*
861          * surplus from last run
862          */
863         if (tip->leftover_ts) {
864                 struct tip_subbuf *prev_ts = tip->leftover_ts;
865
866                 if (prev_ts->len + ts->len > prev_ts->max_len) {
867                         prev_ts->max_len += ts->len;
868                         prev_ts->buf = realloc(prev_ts->buf, prev_ts->max_len);
869                 }
870
871                 memcpy(prev_ts->buf + prev_ts->len, ts->buf, ts->len);
872                 prev_ts->len += ts->len;
873
874                 free(ts->buf);
875                 free(ts);
876
877                 ts = prev_ts;
878                 tip->leftover_ts = NULL;
879         }
880
881         while (offset + sizeof(*t) <= ts->len) {
882                 t = ts->buf + offset;
883
884                 if (verify_trace(t)) {
885                         write_data(tip, ts->buf, offset);
886                         return -1;
887                 }
888
889                 pdu_len = t->pdu_len;
890
891                 if (offset + sizeof(*t) + pdu_len > ts->len)
892                         break;
893
894                 offset += sizeof(*t) + pdu_len;
895                 tip->events_processed++;
896                 tip->data_read += sizeof(*t) + pdu_len;
897                 events++;
898         }
899
900         if (write_data(tip, ts->buf, offset))
901                 return -1;
902
903         /*
904          * leftover bytes, save them for next time
905          */
906         if (offset != ts->len) {
907                 tip->leftover_ts = ts;
908                 ts->len -= offset;
909                 memmove(ts->buf, ts->buf + offset, ts->len);
910         } else {
911                 free(ts->buf);
912                 free(ts);
913         }
914
915         return events;
916 }
917
918 static int write_tip_events(struct thread_information *tip)
919 {
920         struct tip_subbuf *ts = subbuf_fifo_dequeue(tip);
921
922         if (ts)
923                 return tip->flush_subbuf(tip, ts);
924
925         return 0;
926 }
927
928 /*
929  * scans the tips we know and writes out the subbuffers we accumulate
930  */
931 static void get_and_write_events(void)
932 {
933         struct device_information *dip;
934         struct thread_information *tip;
935         int i, j, events, ret, tips_running;
936
937         while (!is_done()) {
938                 events = 0;
939
940                 for_each_dip(dip, i) {
941                         for_each_tip(dip, tip, j) {
942                                 ret = write_tip_events(tip);
943                                 if (ret > 0)
944                                         events += ret;
945                         }
946                 }
947
948                 if (!events)
949                         usleep(100000);
950         }
951
952         /*
953          * reap stored events
954          */
955         do {
956                 events = 0;
957                 tips_running = 0;
958                 for_each_dip(dip, i) {
959                         for_each_tip(dip, tip, j) {
960                                 ret = write_tip_events(tip);
961                                 if (ret > 0)
962                                         events += ret;
963                                 tips_running += !tip->exited;
964                         }
965                 }
966                 usleep(10);
967         } while (events || tips_running);
968 }
969
970 static void wait_for_threads(void)
971 {
972         /*
973          * for piped or network output, poll and fetch data for writeout.
974          * for files, we just wait around for trace threads to exit
975          */
976         if ((output_name && !strcmp(output_name, "-")) ||
977             ((net_mode == Net_client) && !net_use_sendfile))
978                 get_and_write_events();
979         else {
980                 struct device_information *dip;
981                 struct thread_information *tip;
982                 int i, j, tips_running;
983
984                 do {
985                         tips_running = 0;
986                         usleep(100000);
987
988                         for_each_dip(dip, i)
989                                 for_each_tip(dip, tip, j)
990                                         tips_running += !tip->exited;
991                 } while (tips_running);
992         }
993
994         if (net_mode == Net_client)
995                 net_client_send_close();
996 }
997
998 static int fill_ofname(struct device_information *dip,
999                        struct thread_information *tip, char *dst,
1000                        char *buts_name)
1001 {
1002         struct stat sb;
1003         int len = 0;
1004
1005         if (output_dir)
1006                 len = sprintf(dst, "%s/", output_dir);
1007         else
1008                 len = sprintf(dst, "./");
1009
1010         if (net_mode == Net_server) {
1011                 struct net_connection *nc = tip->nc;
1012
1013                 len += sprintf(dst + len, "%s-", inet_ntoa(nc->ch->cl_in_addr));
1014                 len += strftime(dst + len, 64, "%F-%T/", gmtime(&dip->cl_connect_time));
1015         }
1016
1017         if (stat(dst, &sb) < 0) {
1018                 if (errno != ENOENT) {
1019                         perror("stat");
1020                         return 1;
1021                 }
1022                 if (mkdir(dst, 0755) < 0) {
1023                         perror(dst);
1024                         fprintf(stderr, "Can't make output dir\n");
1025                         return 1;
1026                 }
1027         }
1028
1029         if (output_name)
1030                 sprintf(dst + len, "%s.blktrace.%d", output_name, tip->cpu);
1031         else
1032                 sprintf(dst + len, "%s.blktrace.%d", buts_name, tip->cpu);
1033
1034         return 0;
1035 }
1036
1037 static void fill_ops(struct thread_information *tip)
1038 {
1039         /*
1040          * setup ops
1041          */
1042         if (net_mode == Net_client) {
1043                 if (net_use_sendfile) {
1044                         tip->get_subbuf = get_subbuf_sendfile;
1045                         tip->flush_subbuf = NULL;
1046                 } else {
1047                         tip->get_subbuf = get_subbuf;
1048                         tip->flush_subbuf = flush_subbuf_net;
1049                 }
1050         } else {
1051                 if (tip->ofile_mmap)
1052                         tip->get_subbuf = mmap_subbuf;
1053                 else
1054                         tip->get_subbuf = get_subbuf;
1055
1056                 tip->flush_subbuf = flush_subbuf_file;
1057         }
1058                         
1059         if (net_mode == Net_server)
1060                 tip->read_data = read_data_net;
1061         else
1062                 tip->read_data = read_data_file;
1063 }
1064
1065 static int tip_open_output(struct device_information *dip,
1066                            struct thread_information *tip)
1067 {
1068         int pipeline = output_name && !strcmp(output_name, "-");
1069         int mode, vbuf_size;
1070         char op[128];
1071
1072         if (net_mode == Net_client) {
1073                 tip->ofile = NULL;
1074                 tip->ofile_stdout = 0;
1075                 tip->ofile_mmap = 0;
1076                 goto done;
1077         } else if (pipeline) {
1078                 tip->ofile = fdopen(STDOUT_FILENO, "w");
1079                 tip->ofile_stdout = 1;
1080                 tip->ofile_mmap = 0;
1081                 mode = _IOLBF;
1082                 vbuf_size = 512;
1083         } else {
1084                 if (fill_ofname(dip, tip, op, dip->buts_name))
1085                         return 1;
1086                 tip->ofile = fopen(op, "w+");
1087                 tip->ofile_stdout = 0;
1088                 tip->ofile_mmap = 1;
1089                 mode = _IOFBF;
1090                 vbuf_size = OFILE_BUF;
1091         }
1092
1093         if (tip->ofile == NULL) {
1094                 perror(op);
1095                 return 1;
1096         }
1097
1098         tip->ofile_buffer = malloc(vbuf_size);
1099         if (setvbuf(tip->ofile, tip->ofile_buffer, mode, vbuf_size)) {
1100                 perror("setvbuf");
1101                 close_thread(tip);
1102                 return 1;
1103         }
1104
1105 done:
1106         fill_ops(tip);
1107         return 0;
1108 }
1109
1110 static int start_threads(struct device_information *dip)
1111 {
1112         struct thread_information *tip;
1113         int j;
1114
1115         for_each_tip(dip, tip, j) {
1116                 tip->cpu = j;
1117                 tip->device = dip;
1118                 tip->events_processed = 0;
1119                 tip->fd = -1;
1120                 memset(&tip->fifo, 0, sizeof(tip->fifo));
1121                 tip->leftover_ts = NULL;
1122
1123                 if (tip_open_output(dip, tip))
1124                         return 1;
1125
1126                 if (pthread_create(&tip->thread, NULL, thread_main, tip)) {
1127                         perror("pthread_create");
1128                         close_thread(tip);
1129                         return 1;
1130                 }
1131         }
1132
1133         return 0;
1134 }
1135
1136 static void stop_threads(struct device_information *dip)
1137 {
1138         struct thread_information *tip;
1139         unsigned long ret;
1140         int i;
1141
1142         for_each_tip(dip, tip, i) {
1143                 (void) pthread_join(tip->thread, (void *) &ret);
1144                 close_thread(tip);
1145         }
1146 }
1147
1148 static void stop_all_threads(void)
1149 {
1150         struct device_information *dip;
1151         int i;
1152
1153         for_each_dip(dip, i)
1154                 stop_threads(dip);
1155 }
1156
1157 static void stop_all_tracing(void)
1158 {
1159         struct device_information *dip;
1160         int i;
1161
1162         for_each_dip(dip, i)
1163                 stop_trace(dip);
1164 }
1165
1166 static void exit_trace(int status)
1167 {
1168         if (!is_trace_stopped()) {
1169                 trace_stopped = 1;
1170                 stop_all_threads();
1171                 stop_all_tracing();
1172         }
1173
1174         exit(status);
1175 }
1176
1177 static int resize_devices(char *path)
1178 {
1179         int size = (ndevs + 1) * sizeof(struct device_information);
1180
1181         device_information = realloc(device_information, size);
1182         if (!device_information) {
1183                 fprintf(stderr, "Out of memory, device %s (%d)\n", path, size);
1184                 return 1;
1185         }
1186         device_information[ndevs].path = path;
1187         ndevs++;
1188         return 0;
1189 }
1190
1191 static int open_devices(void)
1192 {
1193         struct device_information *dip;
1194         int i;
1195
1196         for_each_dip(dip, i) {
1197                 dip->fd = open(dip->path, O_RDONLY | O_NONBLOCK);
1198                 if (dip->fd < 0) {
1199                         perror(dip->path);
1200                         return 1;
1201                 }
1202                 dip->buf_size = buf_size;
1203                 dip->buf_nr = buf_nr;
1204                 dip->page_size = page_size;
1205         }
1206
1207         return 0;
1208 }
1209
1210 static int start_devices(void)
1211 {
1212         struct device_information *dip;
1213         int i, j, size;
1214
1215         size = ncpus * sizeof(struct thread_information);
1216         thread_information = malloc(size * ndevs);
1217         if (!thread_information) {
1218                 fprintf(stderr, "Out of memory, threads (%d)\n", size * ndevs);
1219                 return 1;
1220         }
1221
1222         for_each_dip(dip, i) {
1223                 if (start_trace(dip)) {
1224                         close(dip->fd);
1225                         fprintf(stderr, "Failed to start trace on %s\n",
1226                                 dip->path);
1227                         break;
1228                 }
1229         }
1230
1231         if (i != ndevs) {
1232                 __for_each_dip(dip, device_information, i, j)
1233                         stop_trace(dip);
1234
1235                 return 1;
1236         }
1237
1238         for_each_dip(dip, i) {
1239                 dip->threads = thread_information + (i * ncpus);
1240                 if (start_threads(dip)) {
1241                         fprintf(stderr, "Failed to start worker threads\n");
1242                         break;
1243                 }
1244         }
1245
1246         if (i != ndevs) {
1247                 __for_each_dip(dip, device_information, i, j)
1248                         stop_threads(dip);
1249                 for_each_dip(dip, i)
1250                         stop_trace(dip);
1251
1252                 return 1;
1253         }
1254
1255         return 0;
1256 }
1257
1258 static void show_stats(struct device_information *dips, int ndips, int cpus)
1259 {
1260         struct device_information *dip;
1261         struct thread_information *tip;
1262         unsigned long long events_processed, data_read;
1263         unsigned long total_drops;
1264         int i, j, no_stdout = 0;
1265
1266         if (is_stat_shown())
1267                 return;
1268
1269         if (output_name && !strcmp(output_name, "-"))
1270                 no_stdout = 1;
1271
1272         stat_shown = 1;
1273
1274         total_drops = 0;
1275         __for_each_dip(dip, dips, ndips, i) {
1276                 if (!no_stdout)
1277                         printf("Device: %s\n", dip->path);
1278                 events_processed = 0;
1279                 data_read = 0;
1280                 __for_each_tip(dip, tip, cpus, j) {
1281                         if (!no_stdout)
1282                                 printf("  CPU%3d: %20lu events, %8llu KiB data\n",
1283                                         tip->cpu, tip->events_processed,
1284                                         (tip->data_read + 1023) >> 10);
1285                         events_processed += tip->events_processed;
1286                         data_read += tip->data_read;
1287                 }
1288                 total_drops += dip->drop_count;
1289                 if (!no_stdout)
1290                         printf("  Total:  %20llu events (dropped %lu), %8llu KiB data\n",
1291                                         events_processed, dip->drop_count,
1292                                         (data_read + 1023) >> 10);
1293         }
1294
1295         if (total_drops)
1296                 fprintf(stderr, "You have dropped events, consider using a larger buffer size (-b)\n");
1297 }
1298
1299 static struct device_information *net_get_dip(struct net_connection *nc,
1300                                               struct blktrace_net_hdr *bnh)
1301 {
1302         struct device_information *dip, *cl_dip = NULL;
1303         struct cl_host *ch = nc->ch;
1304         int i;
1305
1306         for (i = 0; i < ch->ndevs; i++) {
1307                 dip = &ch->device_information[i];
1308
1309                 if (!strcmp(dip->buts_name, bnh->buts_name))
1310                         return dip;
1311
1312                 if (dip->cl_id == bnh->cl_id)
1313                         cl_dip = dip;
1314         }
1315
1316         ch->device_information = realloc(ch->device_information, (ch->ndevs + 1) * sizeof(*dip));
1317         dip = &ch->device_information[ch->ndevs];
1318         memset(dip, 0, sizeof(*dip));
1319         dip->fd = -1;
1320         dip->ch = ch;
1321         dip->cl_id = bnh->cl_id;
1322         dip->buf_size = bnh->buf_size;
1323         dip->buf_nr = bnh->buf_nr;
1324         dip->page_size = bnh->page_size;
1325
1326         if (cl_dip)
1327                 dip->cl_connect_time = cl_dip->cl_connect_time;
1328         else
1329                 dip->cl_connect_time = nc->connect_time;
1330         strcpy(dip->buts_name, bnh->buts_name);
1331         dip->path = strdup(bnh->buts_name);
1332         dip->trace_started = 1;
1333         ch->ndevs++;
1334         dip->threads = malloc(nc->ncpus * sizeof(struct thread_information));
1335         memset(dip->threads, 0, nc->ncpus * sizeof(struct thread_information));
1336
1337         /*
1338          * open all files
1339          */
1340         for (i = 0; i < nc->ncpus; i++) {
1341                 struct thread_information *tip = &dip->threads[i];
1342
1343                 tip->cpu = i;
1344                 tip->device = dip;
1345                 tip->fd = -1;
1346                 tip->nc = nc;
1347                 
1348                 if (tip_open_output(dip, tip))
1349                         return NULL;
1350
1351                 tip->nc = NULL;
1352         }
1353
1354         return dip;
1355 }
1356
1357 static struct thread_information *net_get_tip(struct net_connection *nc,
1358                                               struct blktrace_net_hdr *bnh)
1359 {
1360         struct device_information *dip;
1361         struct thread_information *tip;
1362
1363         dip = net_get_dip(nc, bnh);
1364         if (!dip->trace_started) {
1365                 fprintf(stderr, "Events for closed devices %s\n", dip->buts_name);
1366                 return NULL;
1367         }
1368
1369         tip = &dip->threads[bnh->cpu];
1370         if (!tip->nc)
1371                 tip->nc = nc;
1372         
1373         return tip;
1374 }
1375
1376 static int net_get_header(struct net_connection *nc,
1377                           struct blktrace_net_hdr *bnh)
1378 {
1379         int fl = fcntl(nc->in_fd, F_GETFL);
1380         int bytes_left, ret;
1381         void *p = bnh;
1382
1383         fcntl(nc->in_fd, F_SETFL, fl | O_NONBLOCK);
1384         bytes_left = sizeof(*bnh);
1385         while (bytes_left && !is_done()) {
1386                 ret = recv(nc->in_fd, p, bytes_left, MSG_WAITALL);
1387                 if (ret < 0) {
1388                         if (errno != EAGAIN) {
1389                                 perror("recv header");
1390                                 return 1;
1391                         }
1392                         usleep(1000);
1393                         continue;
1394                 } else if (!ret) {
1395                         usleep(1000);
1396                         continue;
1397                 } else {
1398                         p += ret;
1399                         bytes_left -= ret;
1400                 }
1401         }
1402         fcntl(nc->in_fd, F_SETFL, fl & ~O_NONBLOCK);
1403         return bytes_left;
1404 }
1405
1406 /*
1407  * finalize a net client: truncate files, show stats, cleanup, etc
1408  */
1409 static void device_done(struct net_connection *nc, struct device_information *dip)
1410 {
1411         struct thread_information *tip;
1412         int i;
1413
1414         __for_each_tip(dip, tip, nc->ncpus, i)
1415                 tip_ftrunc_final(tip);
1416
1417         show_stats(dip, 1, nc->ncpus);
1418
1419         /*
1420          * cleanup for next run
1421          */
1422         __for_each_tip(dip, tip, nc->ncpus, i) {
1423                 if (tip->ofile)
1424                         fclose(tip->ofile);
1425         }
1426
1427         free(dip->threads);
1428         free(dip->path);
1429
1430         close(nc->in_fd);
1431         nc->in_fd = -1;
1432
1433         stat_shown = 0;
1434 }
1435
1436 static inline int in_addr_eq(struct in_addr a, struct in_addr b)
1437 {
1438         return a.s_addr == b.s_addr;
1439 }
1440
1441 static void net_add_client_host(struct cl_host *ch)
1442 {
1443         ch->list_next = cl_host_list;
1444         cl_host_list = ch;
1445         cl_hosts++;
1446 }
1447
1448 static void net_remove_client_host(struct cl_host *ch)
1449 {
1450         struct cl_host *p, *c;
1451         
1452         for (p = c = cl_host_list; c; c = c->list_next) {
1453                 if (c == ch) {
1454                         if (p == c)
1455                                 cl_host_list = c->list_next;
1456                         else
1457                                 p->list_next = c->list_next;
1458                         cl_hosts--;
1459                         return;
1460                 }
1461                 p = c;
1462         }
1463 }
1464
1465 static struct cl_host *net_find_client_host(struct in_addr cl_in_addr)
1466 {
1467         struct cl_host *ch = cl_host_list;
1468
1469         while (ch) {
1470                 if (in_addr_eq(ch->cl_in_addr, cl_in_addr))
1471                         return ch;
1472                 ch = ch->list_next;
1473         }
1474
1475         return NULL;
1476 }
1477
1478 static void net_client_host_done(struct cl_host *ch)
1479 {
1480         free(ch->device_information);
1481         free(ch->net_connections);
1482         net_connects -= ch->nconn;
1483         net_remove_client_host(ch);
1484         free(ch);
1485 }
1486
1487 /*
1488  * handle incoming events from a net client
1489  */
1490 static int net_client_data(struct net_connection *nc)
1491 {
1492         struct thread_information *tip;
1493         struct blktrace_net_hdr bnh;
1494
1495         if (net_get_header(nc, &bnh))
1496                 return 1;
1497
1498         if (data_is_native == -1 && check_data_endianness(bnh.magic)) {
1499                 fprintf(stderr, "server: received data is bad\n");
1500                 return 1;
1501         }
1502
1503         if (!data_is_native) {
1504                 bnh.magic = be32_to_cpu(bnh.magic);
1505                 bnh.cpu = be32_to_cpu(bnh.cpu);
1506                 bnh.max_cpus = be32_to_cpu(bnh.max_cpus);
1507                 bnh.len = be32_to_cpu(bnh.len);
1508                 bnh.cl_id = be32_to_cpu(bnh.cl_id);
1509                 bnh.buf_size = be32_to_cpu(bnh.buf_size);
1510                 bnh.buf_nr = be32_to_cpu(bnh.buf_nr);
1511                 bnh.page_size = be32_to_cpu(bnh.page_size);
1512         }
1513
1514         if ((bnh.magic & 0xffffff00) != BLK_IO_TRACE_MAGIC) {
1515                 fprintf(stderr, "server: bad data magic\n");
1516                 return 1;
1517         }
1518
1519         if (nc->ncpus == -1)
1520                 nc->ncpus = bnh.max_cpus;
1521
1522         /*
1523          * len == 0 means that the other end signalled end-of-run
1524          */
1525         if (!bnh.len) {
1526                 /*
1527                  * overload cpu count with dropped events
1528                  */
1529                 struct device_information *dip;
1530
1531                 dip = net_get_dip(nc, &bnh);
1532                 dip->drop_count = bnh.cpu;
1533                 dip->trace_started = 0;
1534
1535                 printf("server: end of run for %s\n", dip->buts_name);
1536
1537                 device_done(nc, dip);
1538
1539                 if (++nc->ch->ndevs_done == nc->ch->ndevs)
1540                         net_client_host_done(nc->ch);
1541
1542                 return 0;
1543         }
1544
1545         tip = net_get_tip(nc, &bnh);
1546         if (!tip)
1547                 return 1;
1548
1549         if (mmap_subbuf(tip, bnh.len))
1550                 return 1;
1551
1552         return 0;
1553 }
1554
1555 static void net_add_connection(int listen_fd, struct sockaddr_in *addr)
1556 {
1557         socklen_t socklen = sizeof(*addr);
1558         struct net_connection *nc;
1559         struct cl_host *ch;
1560         int in_fd;
1561
1562         in_fd = accept(listen_fd, (struct sockaddr *) addr, &socklen);
1563         if (in_fd < 0) {
1564                 perror("accept");
1565                 return;
1566         }
1567
1568         ch = net_find_client_host(addr->sin_addr);
1569         if (!ch) {
1570                 if (cl_hosts == NET_MAX_CL_HOSTS) {
1571                         fprintf(stderr, "server: no more clients allowed\n");
1572                         return;
1573                 }
1574                 ch = malloc(sizeof(struct cl_host));
1575                 memset(ch, 0, sizeof(*ch));
1576                 ch->cl_in_addr = addr->sin_addr;
1577                 net_add_client_host(ch);
1578
1579                 printf("server: connection from %s\n", inet_ntoa(addr->sin_addr));
1580         }
1581
1582         ch->net_connections = realloc(ch->net_connections, (ch->nconn + 1) * sizeof(*nc));
1583         nc = &ch->net_connections[ch->nconn++];
1584         memset(nc, 0, sizeof(*nc));
1585
1586         time(&nc->connect_time);
1587         nc->ch = ch;
1588         nc->in_fd = in_fd;
1589         nc->ncpus = -1;
1590         net_connects++;
1591 }
1592
1593 /*
1594  * event driven loop, handle new incoming connections and data from
1595  * existing connections
1596  */
1597 static void net_server_handle_connections(int listen_fd,
1598                                           struct sockaddr_in *addr)
1599 {
1600         struct pollfd *pfds = NULL;
1601         struct net_connection **ncs = NULL;
1602         int max_connects = 0;
1603         int i, nconns, events;
1604         struct cl_host *ch;
1605         struct net_connection *nc;
1606         
1607         printf("server: waiting for connections...\n");
1608
1609         while (!is_done()) {
1610                 if (net_connects >= max_connects) {
1611                         pfds = realloc(pfds, (net_connects + 1) * sizeof(*pfds));
1612                         ncs = realloc(ncs, (net_connects + 1) * sizeof(*ncs));
1613                         max_connects = net_connects + 1;
1614                 }
1615                 /*
1616                  * the zero entry is for incoming connections, remaining
1617                  * entries for clients
1618                  */
1619                 pfds[0].fd = listen_fd;
1620                 pfds[0].events = POLLIN;
1621                 nconns = 0;
1622                 for_each_cl_host(ch) {
1623                         for (i = 0; i < ch->nconn; i++) {
1624                                 nc = &ch->net_connections[i];
1625                                 pfds[nconns + 1].fd = nc->in_fd;
1626                                 pfds[nconns + 1].events = POLLIN;
1627                                 ncs[nconns++] = nc;
1628                         }
1629                 }
1630
1631                 events = poll(pfds, 1 + nconns, -1);
1632                 if (events < 0) {
1633                         if (errno == EINTR)
1634                                 continue;
1635
1636                         perror("poll");
1637                         break;
1638                 } else if (!events)
1639                         continue;
1640
1641                 if (pfds[0].revents & POLLIN) {
1642                         net_add_connection(listen_fd, addr);
1643                         events--;
1644                 }
1645
1646                 for (i = 0; events && i < nconns; i++) {
1647                         if (pfds[i + 1].revents & POLLIN) {
1648                                 net_client_data(ncs[i]);
1649                                 events--;
1650                         }
1651                 }
1652         }
1653 }
1654
1655 /*
1656  * Start here when we are in server mode - just fetch data from the network
1657  * and dump to files
1658  */
1659 static int net_server(void)
1660 {
1661         struct sockaddr_in addr;
1662         int fd, opt;
1663
1664         fd = socket(AF_INET, SOCK_STREAM, 0);
1665         if (fd < 0) {
1666                 perror("server: socket");
1667                 return 1;
1668         }
1669
1670         opt = 1;
1671         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
1672                 perror("setsockopt");
1673                 return 1;
1674         }
1675
1676         memset(&addr, 0, sizeof(addr));
1677         addr.sin_family = AF_INET;
1678         addr.sin_addr.s_addr = htonl(INADDR_ANY);
1679         addr.sin_port = htons(net_port);
1680
1681         if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
1682                 perror("bind");
1683                 return 1;
1684         }
1685
1686         if (listen(fd, 1) < 0) {
1687                 perror("listen");
1688                 return 1;
1689         }
1690
1691         net_server_handle_connections(fd, &addr);
1692         return 0;
1693 }
1694
1695 /*
1696  * Setup outgoing network connection where we will transmit data
1697  */
1698 static int net_setup_client_cpu(int i, struct sockaddr_in *addr)
1699 {
1700         int fd;
1701
1702         fd = socket(AF_INET, SOCK_STREAM, 0);
1703         if (fd < 0) {
1704                 perror("client: socket");
1705                 return 1;
1706         }
1707
1708         if (connect(fd, (struct sockaddr *) addr, sizeof(*addr)) < 0) {
1709                 perror("client: connect");
1710                 return 1;
1711         }
1712
1713         net_out_fd[i] = fd;
1714         return 0;
1715 }
1716
1717 static int net_setup_client(void)
1718 {
1719         struct sockaddr_in addr;
1720         int i;
1721
1722         memset(&addr, 0, sizeof(addr));
1723         addr.sin_family = AF_INET;
1724         addr.sin_port = htons(net_port);
1725
1726         if (inet_aton(hostname, &addr.sin_addr) != 1) {
1727                 struct hostent *hent = gethostbyname(hostname);
1728                 if (!hent) {
1729                         perror("gethostbyname");
1730                         return 1;
1731                 }
1732
1733                 memcpy(&addr.sin_addr, hent->h_addr, 4);
1734                 strcpy(hostname, hent->h_name);
1735         }
1736
1737         printf("blktrace: connecting to %s\n", hostname);
1738
1739         net_out_fd = malloc(ncpus * sizeof(*net_out_fd));
1740         for (i = 0; i < ncpus; i++) {
1741                 if (net_setup_client_cpu(i, &addr))
1742                         return 1;
1743         }
1744
1745         printf("blktrace: connected!\n");
1746         
1747         return 0;
1748 }
1749
1750 static char usage_str[] = \
1751         "-d <dev> [ -r debugfs path ] [ -o <output> ] [-k ] [ -w time ]\n" \
1752         "[ -a action ] [ -A action mask ] [ -v ]\n\n" \
1753         "\t-d Use specified device. May also be given last after options\n" \
1754         "\t-r Path to mounted debugfs, defaults to /debug\n" \
1755         "\t-o File(s) to send output to\n" \
1756         "\t-D Directory to prepend to output file names\n" \
1757         "\t-k Kill a running trace\n" \
1758         "\t-w Stop after defined time, in seconds\n" \
1759         "\t-a Only trace specified actions. See documentation\n" \
1760         "\t-A Give trace mask as a single value. See documentation\n" \
1761         "\t-b Sub buffer size in KiB\n" \
1762         "\t-n Number of sub buffers\n" \
1763         "\t-l Run in network listen mode (blktrace server)\n" \
1764         "\t-h Run in network client mode, connecting to the given host\n" \
1765         "\t-p Network port to use (default 8462)\n" \
1766         "\t-s Make the network client NOT use sendfile() to transfer data\n" \
1767         "\t-V Print program version info\n\n";
1768
1769 static void show_usage(char *program)
1770 {
1771         fprintf(stderr, "Usage: %s %s %s",program, blktrace_version, usage_str);
1772 }
1773
1774 int main(int argc, char *argv[])
1775 {
1776         static char default_debugfs_path[] = "/sys/kernel/debug";
1777         struct statfs st;
1778         int i, c;
1779         int stop_watch = 0;
1780         int act_mask_tmp = 0;
1781
1782         while ((c = getopt_long(argc, argv, S_OPTS, l_opts, NULL)) >= 0) {
1783                 switch (c) {
1784                 case 'a':
1785                         i = find_mask_map(optarg);
1786                         if (i < 0) {
1787                                 fprintf(stderr,"Invalid action mask %s\n",
1788                                         optarg);
1789                                 return 1;
1790                         }
1791                         act_mask_tmp |= i;
1792                         break;
1793
1794                 case 'A':
1795                         if ((sscanf(optarg, "%x", &i) != 1) || 
1796                                                         !valid_act_opt(i)) {
1797                                 fprintf(stderr,
1798                                         "Invalid set action mask %s/0x%x\n",
1799                                         optarg, i);
1800                                 return 1;
1801                         }
1802                         act_mask_tmp = i;
1803                         break;
1804
1805                 case 'd':
1806                         if (resize_devices(optarg) != 0)
1807                                 return 1;
1808                         break;
1809
1810                 case 'r':
1811                         debugfs_path = optarg;
1812                         break;
1813
1814                 case 'o':
1815                         output_name = optarg;
1816                         break;
1817                 case 'k':
1818                         kill_running_trace = 1;
1819                         break;
1820                 case 'w':
1821                         stop_watch = atoi(optarg);
1822                         if (stop_watch <= 0) {
1823                                 fprintf(stderr,
1824                                         "Invalid stopwatch value (%d secs)\n",
1825                                         stop_watch);
1826                                 return 1;
1827                         }
1828                         break;
1829                 case 'V':
1830                         printf("%s version %s\n", argv[0], blktrace_version);
1831                         return 0;
1832                 case 'b':
1833                         buf_size = strtoul(optarg, NULL, 10);
1834                         if (buf_size <= 0 || buf_size > 16*1024) {
1835                                 fprintf(stderr,
1836                                         "Invalid buffer size (%lu)\n",buf_size);
1837                                 return 1;
1838                         }
1839                         buf_size <<= 10;
1840                         break;
1841                 case 'n':
1842                         buf_nr = strtoul(optarg, NULL, 10);
1843                         if (buf_nr <= 0) {
1844                                 fprintf(stderr,
1845                                         "Invalid buffer nr (%lu)\n", buf_nr);
1846                                 return 1;
1847                         }
1848                         break;
1849                 case 'D':
1850                         output_dir = optarg;
1851                         break;
1852                 case 'h':
1853                         net_mode = Net_client;
1854                         strcpy(hostname, optarg);
1855                         break;
1856                 case 'l':
1857                         net_mode = Net_server;
1858                         break;
1859                 case 'p':
1860                         net_port = atoi(optarg);
1861                         break;
1862                 case 's':
1863                         net_use_sendfile = 0;
1864                         break;
1865                 default:
1866                         show_usage(argv[0]);
1867                         return 1;
1868                 }
1869         }
1870
1871         setlocale(LC_NUMERIC, "en_US");
1872
1873         page_size = getpagesize();
1874
1875         if (net_mode == Net_server) {
1876                 if (output_name) {
1877                         fprintf(stderr, "-o ignored in server mode\n");
1878                         output_name = NULL;
1879                 }
1880
1881                 return net_server();
1882         }
1883
1884         while (optind < argc) {
1885                 if (resize_devices(argv[optind++]) != 0)
1886                         return 1;
1887         }
1888
1889         if (ndevs == 0) {
1890                 show_usage(argv[0]);
1891                 return 1;
1892         }
1893
1894         if (act_mask_tmp != 0)
1895                 act_mask = act_mask_tmp;
1896
1897         if (!debugfs_path)
1898                 debugfs_path = default_debugfs_path;
1899
1900         if (statfs(debugfs_path, &st) < 0) {
1901                 perror("statfs");
1902                 fprintf(stderr,"%s does not appear to be a valid path\n",
1903                         debugfs_path);
1904                 return 1;
1905         } else if (st.f_type != (long) DEBUGFS_TYPE) {
1906                 fprintf(stderr,"%s does not appear to be a debug filesystem\n",
1907                         debugfs_path);
1908                 return 1;
1909         }
1910
1911         if (open_devices() != 0)
1912                 return 1;
1913
1914         if (kill_running_trace) {
1915                 stop_all_traces();
1916                 return 0;
1917         }
1918
1919         ncpus = sysconf(_SC_NPROCESSORS_ONLN);
1920         if (ncpus < 0) {
1921                 fprintf(stderr, "sysconf(_SC_NPROCESSORS_ONLN) failed\n");
1922                 return 1;
1923         }
1924
1925         signal(SIGINT, handle_sigint);
1926         signal(SIGHUP, handle_sigint);
1927         signal(SIGTERM, handle_sigint);
1928         signal(SIGALRM, handle_sigint);
1929         signal(SIGPIPE, SIG_IGN);
1930
1931         if (net_mode == Net_client && net_setup_client())
1932                 return 1;
1933
1934         if (start_devices() != 0)
1935                 return 1;
1936
1937         atexit(stop_all_tracing);
1938
1939         if (stop_watch)
1940                 alarm(stop_watch);
1941
1942         wait_for_threads();
1943
1944         if (!is_trace_stopped()) {
1945                 trace_stopped = 1;
1946                 stop_all_threads();
1947                 stop_all_traces();
1948         }
1949
1950         show_stats(device_information, ndevs, ncpus);
1951
1952         return 0;
1953 }
1954