[PATCH] fread/fwrite error handling
[blktrace.git] / blktrace.c
1 /*
2  * block queue tracing application
3  *
4  * Copyright (C) 2005 Jens Axboe <axboe@suse.de>
5  *
6  *  This program is free software; you can redistribute it and/or modify
7  *  it under the terms of the GNU General Public License as published by
8  *  the Free Software Foundation; either version 2 of the License, or
9  *  (at your option) any later version.
10  *
11  *  This program is distributed in the hope that it will be useful,
12  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *  GNU General Public License for more details.
15  *
16  *  You should have received a copy of the GNU General Public License
17  *  along with this program; if not, write to the Free Software
18  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  *
20  */
21 #include <pthread.h>
22 #include <sys/types.h>
23 #include <sys/stat.h>
24 #include <unistd.h>
25 #include <locale.h>
26 #include <signal.h>
27 #include <fcntl.h>
28 #include <string.h>
29 #include <sys/ioctl.h>
30 #include <sys/param.h>
31 #include <sys/statfs.h>
32 #include <sys/poll.h>
33 #include <sys/mman.h>
34 #include <sys/socket.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <sched.h>
38 #include <ctype.h>
39 #include <getopt.h>
40 #include <errno.h>
41 #include <netinet/in.h>
42 #include <arpa/inet.h>
43 #include <netdb.h>
44 #include <sys/sendfile.h>
45
46 #include "blktrace.h"
47 #include "barrier.h"
48
49 static char blktrace_version[] = "0.99.1";
50
51 /*
52  * You may want to increase this even more, if you are logging at a high
53  * rate and see skipped/missed events
54  */
55 #define BUF_SIZE        (512 * 1024)
56 #define BUF_NR          (4)
57
58 #define OFILE_BUF       (128 * 1024)
59
60 #define DEBUGFS_TYPE    0x64626720
61
62 #define S_OPTS  "d:a:A:r:o:kw:Vb:n:D:lh:p:s"
63 static struct option l_opts[] = {
64         {
65                 .name = "dev",
66                 .has_arg = required_argument,
67                 .flag = NULL,
68                 .val = 'd'
69         },
70         {
71                 .name = "act-mask",
72                 .has_arg = required_argument,
73                 .flag = NULL,
74                 .val = 'a'
75         },
76         {
77                 .name = "set-mask",
78                 .has_arg = required_argument,
79                 .flag = NULL,
80                 .val = 'A'
81         },
82         {
83                 .name = "relay",
84                 .has_arg = required_argument,
85                 .flag = NULL,
86                 .val = 'r'
87         },
88         {
89                 .name = "output",
90                 .has_arg = required_argument,
91                 .flag = NULL,
92                 .val = 'o'
93         },
94         {
95                 .name = "kill",
96                 .has_arg = no_argument,
97                 .flag = NULL,
98                 .val = 'k'
99         },
100         {
101                 .name = "stopwatch",
102                 .has_arg = required_argument,
103                 .flag = NULL,
104                 .val = 'w'
105         },
106         {
107                 .name = "version",
108                 .has_arg = no_argument,
109                 .flag = NULL,
110                 .val = 'V'
111         },
112         {
113                 .name = "buffer-size",
114                 .has_arg = required_argument,
115                 .flag = NULL,
116                 .val = 'b'
117         },
118         {
119                 .name = "num-sub-buffers",
120                 .has_arg = required_argument,
121                 .flag = NULL,
122                 .val = 'n'
123         },
124         {
125                 .name = "output-dir",
126                 .has_arg = required_argument,
127                 .flag = NULL,
128                 .val = 'D'
129         },
130         {
131                 .name = "listen",
132                 .has_arg = no_argument,
133                 .flag = NULL,
134                 .val = 'l'
135         },
136         {
137                 .name = "host",
138                 .has_arg = required_argument,
139                 .flag = NULL,
140                 .val = 'h'
141         },
142         {
143                 .name = "port",
144                 .has_arg = required_argument,
145                 .flag = NULL,
146                 .val = 'p'
147         },
148         {
149                 .name = "no-sendfile",
150                 .has_arg = no_argument,
151                 .flag = NULL,
152                 .val = 's'
153         },
154         {
155                 .name = NULL,
156         }
157 };
158
159 struct tip_subbuf {
160         void *buf;
161         unsigned int len;
162         unsigned int max_len;
163 };
164
165 #define FIFO_SIZE       (1024)  /* should be plenty big! */
166 #define CL_SIZE         (128)   /* cache line, any bigger? */
167
168 struct tip_subbuf_fifo {
169         int tail __attribute__((aligned(CL_SIZE)));
170         int head __attribute__((aligned(CL_SIZE)));
171         struct tip_subbuf *q[FIFO_SIZE];
172 };
173
174 struct thread_information {
175         int cpu;
176         pthread_t thread;
177
178         int fd;
179         void *fd_buf;
180         char fn[MAXPATHLEN + 64];
181
182         FILE *ofile;
183         char *ofile_buffer;
184         off_t ofile_offset;
185         int ofile_stdout;
186         int ofile_mmap;
187
188         int (*get_subbuf)(struct thread_information *, unsigned int);
189         int (*flush_subbuf)(struct thread_information *, struct tip_subbuf *);
190         int (*read_data)(struct thread_information *, void *, unsigned int);
191
192         unsigned long events_processed;
193         unsigned long long data_read;
194         unsigned long long data_queued;
195         struct device_information *device;
196
197         int exited;
198
199         /*
200          * piped fifo buffers
201          */
202         struct tip_subbuf_fifo fifo;
203         struct tip_subbuf *leftover_ts;
204
205         /*
206          * mmap controlled output files
207          */
208         unsigned long long fs_size;
209         unsigned long long fs_max_size;
210         unsigned long fs_off;
211         void *fs_buf;
212         unsigned long fs_buf_len;
213
214         struct net_connection *nc;
215 };
216
217 struct device_information {
218         int fd;
219         char *path;
220         char buts_name[32];
221         volatile int trace_started;
222         unsigned long drop_count;
223         struct thread_information *threads;
224         unsigned long buf_size;
225         unsigned long buf_nr;
226         unsigned int page_size;
227
228         struct cl_host *ch;
229         u32 cl_id;
230         time_t cl_connect_time;
231 };
232
233 static int ncpus;
234 static struct thread_information *thread_information;
235 static int ndevs;
236 static struct device_information *device_information;
237
238 /* command line option globals */
239 static char *debugfs_path;
240 static char *output_name;
241 static char *output_dir;
242 static int act_mask = ~0U;
243 static int kill_running_trace;
244 static unsigned long buf_size = BUF_SIZE;
245 static unsigned long buf_nr = BUF_NR;
246 static unsigned int page_size;
247
248 #define is_done()       (*(volatile int *)(&done))
249 static volatile int done;
250
251 #define is_trace_stopped()      (*(volatile int *)(&trace_stopped))
252 static volatile int trace_stopped;
253
254 #define is_stat_shown() (*(volatile int *)(&stat_shown))
255 static volatile int stat_shown;
256
257 int data_is_native = -1;
258
259 static void exit_trace(int status);
260
261 #define dip_tracing(dip)        (*(volatile int *)(&(dip)->trace_started))
262 #define dip_set_tracing(dip, v) ((dip)->trace_started = (v))
263
264 #define __for_each_dip(__d, __di, __e, __i)     \
265         for (__i = 0, __d = __di; __i < __e; __i++, __d++)
266
267 #define for_each_dip(__d, __i)          \
268         __for_each_dip(__d, device_information, ndevs, __i)
269 #define for_each_nc_dip(__nc, __d, __i)         \
270         __for_each_dip(__d, (__nc)->ch->device_information, (__nc)->ch->ndevs, __i)
271
272 #define __for_each_tip(__d, __t, __ncpus, __j)  \
273         for (__j = 0, __t = (__d)->threads; __j < __ncpus; __j++, __t++)
274 #define for_each_tip(__d, __t, __j)     \
275         __for_each_tip(__d, __t, ncpus, __j)
276 #define for_each_cl_host(__c)   \
277         for (__c = cl_host_list; __c; __c = __c->list_next)
278
279 /*
280  * networking stuff follows. we include a magic number so we know whether
281  * to endianness convert or not
282  */
283 struct blktrace_net_hdr {
284         u32 magic;              /* same as trace magic */
285         char buts_name[32];     /* trace name */
286         u32 cpu;                /* for which cpu */
287         u32 max_cpus;
288         u32 len;                /* length of following trace data */
289         u32 cl_id;              /* id for set of client per-cpu connections */
290         u32 buf_size;           /* client buf_size for this trace  */
291         u32 buf_nr;             /* client buf_nr for this trace  */
292         u32 page_size;          /* client page_size for this trace  */
293 };
294
295 #define TRACE_NET_PORT          (8462)
296
297 enum {
298         Net_none = 0,
299         Net_server,
300         Net_client,
301 };
302
303 /*
304  * network cmd line params
305  */
306 static char hostname[MAXHOSTNAMELEN];
307 static int net_port = TRACE_NET_PORT;
308 static int net_mode = 0;
309 static int net_use_sendfile = 1;
310
311 struct cl_host {
312         struct cl_host *list_next;
313         struct in_addr cl_in_addr;
314         struct net_connection *net_connections;
315         int nconn;
316         struct device_information *device_information;
317         int ndevs;
318         int ncpus;
319         int ndevs_done;
320 };
321
322 struct net_connection {
323         int in_fd;
324         struct pollfd pfd;
325         time_t connect_time;
326         struct cl_host *ch;
327         int ncpus;
328 };
329
330 #define NET_MAX_CL_HOSTS        (1024)
331 static struct cl_host *cl_host_list;
332 static int cl_hosts;
333 static int net_connects;
334
335 static int *net_out_fd;
336
337 static void handle_sigint(__attribute__((__unused__)) int sig)
338 {
339         struct device_information *dip;
340         int i;
341
342         /*
343          * stop trace so we can reap currently produced data
344          */
345         for_each_dip(dip, i) {
346                 if (dip->fd == -1)
347                         continue;
348                 if (ioctl(dip->fd, BLKTRACESTOP) < 0)
349                         perror("BLKTRACESTOP");
350         }
351
352         done = 1;
353 }
354
355 static int get_dropped_count(const char *buts_name)
356 {
357         int fd;
358         char tmp[MAXPATHLEN + 64];
359
360         snprintf(tmp, sizeof(tmp), "%s/block/%s/dropped",
361                  debugfs_path, buts_name);
362
363         fd = open(tmp, O_RDONLY);
364         if (fd < 0) {
365                 /*
366                  * this may be ok, if the kernel doesn't support dropped counts
367                  */
368                 if (errno == ENOENT)
369                         return 0;
370
371                 fprintf(stderr, "Couldn't open dropped file %s\n", tmp);
372                 return -1;
373         }
374
375         if (read(fd, tmp, sizeof(tmp)) < 0) {
376                 perror(tmp);
377                 close(fd);
378                 return -1;
379         }
380
381         close(fd);
382
383         return atoi(tmp);
384 }
385
386 static int start_trace(struct device_information *dip)
387 {
388         struct blk_user_trace_setup buts;
389
390         memset(&buts, 0, sizeof(buts));
391         buts.buf_size = dip->buf_size;
392         buts.buf_nr = dip->buf_nr;
393         buts.act_mask = act_mask;
394
395         if (ioctl(dip->fd, BLKTRACESETUP, &buts) < 0) {
396                 perror("BLKTRACESETUP");
397                 return 1;
398         }
399
400         if (ioctl(dip->fd, BLKTRACESTART) < 0) {
401                 perror("BLKTRACESTART");
402                 return 1;
403         }
404
405         memcpy(dip->buts_name, buts.name, sizeof(dip->buts_name));
406         dip_set_tracing(dip, 1);
407         return 0;
408 }
409
410 static void stop_trace(struct device_information *dip)
411 {
412         if (dip_tracing(dip) || kill_running_trace) {
413                 dip_set_tracing(dip, 0);
414
415                 /*
416                  * should be stopped, just don't complain if it isn't
417                  */
418                 ioctl(dip->fd, BLKTRACESTOP);
419
420                 if (ioctl(dip->fd, BLKTRACETEARDOWN) < 0)
421                         perror("BLKTRACETEARDOWN");
422
423                 close(dip->fd);
424                 dip->fd = -1;
425         }
426 }
427
428 static void stop_all_traces(void)
429 {
430         struct device_information *dip;
431         int i;
432
433         for_each_dip(dip, i) {
434                 dip->drop_count = get_dropped_count(dip->buts_name);
435                 stop_trace(dip);
436         }
437 }
438
439 static void wait_for_data(struct thread_information *tip, int timeout)
440 {
441         struct pollfd pfd = { .fd = tip->fd, .events = POLLIN };
442
443         while (!is_done()) {
444                 if (poll(&pfd, 1, timeout) < 0) {
445                         perror("poll");
446                         break;
447                 }
448                 if (pfd.revents & POLLIN)
449                         break;
450                 if (tip->ofile_stdout)
451                         break;
452         }
453 }
454
455 static int read_data_file(struct thread_information *tip, void *buf,
456                           unsigned int len)
457 {
458         int ret = 0;
459
460         do {
461                 wait_for_data(tip, 100);
462
463                 ret = read(tip->fd, buf, len);
464                 if (!ret)
465                         continue;
466                 else if (ret > 0)
467                         return ret;
468                 else {
469                         if (errno != EAGAIN) {
470                                 perror(tip->fn);
471                                 fprintf(stderr,"Thread %d failed read of %s\n",
472                                         tip->cpu, tip->fn);
473                                 break;
474                         }
475                         continue;
476                 }
477         } while (!is_done());
478
479         return ret;
480
481 }
482
483 static int read_data_net(struct thread_information *tip, void *buf,
484                          unsigned int len)
485 {
486         struct net_connection *nc = tip->nc;
487         unsigned int bytes_left = len;
488         int ret = 0;
489
490         do {
491                 ret = recv(nc->in_fd, buf, bytes_left, MSG_WAITALL);
492
493                 if (!ret)
494                         continue;
495                 else if (ret < 0) {
496                         if (errno != EAGAIN) {
497                                 perror(tip->fn);
498                                 fprintf(stderr, "server: failed read\n");
499                                 return 0;
500                         }
501                         continue;
502                 } else {
503                         buf += ret;
504                         bytes_left -= ret;
505                 }
506         } while (!is_done() && bytes_left);
507
508         return len - bytes_left;
509 }
510
511 static inline struct tip_subbuf *
512 subbuf_fifo_dequeue(struct thread_information *tip)
513 {
514         const int head = tip->fifo.head;
515         const int next = (head + 1) & (FIFO_SIZE - 1);
516
517         if (head != tip->fifo.tail) {
518                 struct tip_subbuf *ts = tip->fifo.q[head];
519
520                 store_barrier();
521                 tip->fifo.head = next;
522                 return ts;
523         }
524
525         return NULL;
526 }
527
528 static inline int subbuf_fifo_queue(struct thread_information *tip,
529                                     struct tip_subbuf *ts)
530 {
531         const int tail = tip->fifo.tail;
532         const int next = (tail + 1) & (FIFO_SIZE - 1);
533
534         if (next != tip->fifo.head) {
535                 tip->fifo.q[tail] = ts;
536                 store_barrier();
537                 tip->fifo.tail = next;
538                 return 0;
539         }
540
541         fprintf(stderr, "fifo too small!\n");
542         return 1;
543 }
544
545 /*
546  * For file output, truncate and mmap the file appropriately
547  */
548 static int mmap_subbuf(struct thread_information *tip, unsigned int maxlen)
549 {
550         int ofd = fileno(tip->ofile);
551         int ret;
552         unsigned long nr;
553
554         /*
555          * extend file, if we have to. use chunks of 16 subbuffers.
556          */
557         if (tip->fs_off + maxlen > tip->fs_buf_len) {
558                 if (tip->fs_buf) {
559                         munlock(tip->fs_buf, tip->fs_buf_len);
560                         munmap(tip->fs_buf, tip->fs_buf_len);
561                         tip->fs_buf = NULL;
562                 }
563
564                 tip->fs_off = tip->fs_size & (tip->device->page_size - 1);
565                 nr = max(16, tip->device->buf_nr);
566                 tip->fs_buf_len = (nr * tip->device->buf_size) - tip->fs_off;
567                 tip->fs_max_size += tip->fs_buf_len;
568
569                 if (ftruncate(ofd, tip->fs_max_size) < 0) {
570                         perror("ftruncate");
571                         return -1;
572                 }
573
574                 tip->fs_buf = mmap(NULL, tip->fs_buf_len, PROT_WRITE,
575                                    MAP_SHARED, ofd, tip->fs_size - tip->fs_off);
576                 if (tip->fs_buf == MAP_FAILED) {
577                         perror("mmap");
578                         return -1;
579                 }
580                 mlock(tip->fs_buf, tip->fs_buf_len);
581         }
582
583         ret = tip->read_data(tip, tip->fs_buf + tip->fs_off, maxlen);
584         if (ret >= 0) {
585                 tip->data_read += ret;
586                 tip->fs_size += ret;
587                 tip->fs_off += ret;
588                 return 0;
589         }
590
591         return -1;
592 }
593
594 /*
595  * Use the copy approach for pipes and network
596  */
597 static int get_subbuf(struct thread_information *tip, unsigned int maxlen)
598 {
599         struct tip_subbuf *ts = malloc(sizeof(*ts));
600         int ret;
601
602         ts->buf = malloc(tip->device->buf_size);
603         ts->max_len = maxlen;
604
605         ret = tip->read_data(tip, ts->buf, ts->max_len);
606         if (ret > 0) {
607                 ts->len = ret;
608                 tip->data_read += ret;
609                 if (subbuf_fifo_queue(tip, ts))
610                         ret = -1;
611         }
612
613         if (ret <= 0) {
614                 free(ts->buf);
615                 free(ts);
616         }
617
618         return ret;
619 }
620
621 static void close_thread(struct thread_information *tip)
622 {
623         if (tip->fd != -1)
624                 close(tip->fd);
625         if (tip->ofile)
626                 fclose(tip->ofile);
627         if (tip->ofile_buffer)
628                 free(tip->ofile_buffer);
629         if (tip->fd_buf)
630                 free(tip->fd_buf);
631
632         tip->fd = -1;
633         tip->ofile = NULL;
634         tip->ofile_buffer = NULL;
635         tip->fd_buf = NULL;
636 }
637
638 static void tip_ftrunc_final(struct thread_information *tip)
639 {
640         /*
641          * truncate to right size and cleanup mmap
642          */
643         if (tip->ofile_mmap && tip->ofile) {
644                 int ofd = fileno(tip->ofile);
645
646                 if (tip->fs_buf)
647                         munmap(tip->fs_buf, tip->fs_buf_len);
648
649                 ftruncate(ofd, tip->fs_size);
650         }
651 }
652
653 static void *thread_main(void *arg)
654 {
655         struct thread_information *tip = arg;
656         pid_t pid = getpid();
657         cpu_set_t cpu_mask;
658
659         CPU_ZERO(&cpu_mask);
660         CPU_SET((tip->cpu), &cpu_mask);
661
662         if (sched_setaffinity(pid, sizeof(cpu_mask), &cpu_mask) == -1) {
663                 perror("sched_setaffinity");
664                 exit_trace(1);
665         }
666
667         snprintf(tip->fn, sizeof(tip->fn), "%s/block/%s/trace%d",
668                         debugfs_path, tip->device->buts_name, tip->cpu);
669         tip->fd = open(tip->fn, O_RDONLY);
670         if (tip->fd < 0) {
671                 perror(tip->fn);
672                 fprintf(stderr,"Thread %d failed open of %s\n", tip->cpu,
673                         tip->fn);
674                 exit_trace(1);
675         }
676
677         while (!is_done()) {
678                 if (tip->get_subbuf(tip, tip->device->buf_size) < 0)
679                         break;
680         }
681
682         /*
683          * trace is stopped, pull data until we get a short read
684          */
685         while (tip->get_subbuf(tip, tip->device->buf_size) > 0)
686                 ;
687
688         tip_ftrunc_final(tip);
689         tip->exited = 1;
690         return NULL;
691 }
692
693 static int write_data_net(int fd, void *buf, unsigned int buf_len)
694 {
695         unsigned int bytes_left = buf_len;
696         int ret;
697
698         while (bytes_left) {
699                 ret = send(fd, buf, bytes_left, 0);
700                 if (ret < 0) {
701                         perror("send");
702                         return 1;
703                 }
704
705                 buf += ret;
706                 bytes_left -= ret;
707         }
708
709         return 0;
710 }
711
712 static int net_send_header(struct thread_information *tip, unsigned int len)
713 {
714         struct blktrace_net_hdr hdr;
715
716         hdr.magic = BLK_IO_TRACE_MAGIC;
717         strcpy(hdr.buts_name, tip->device->buts_name);
718         hdr.cpu = tip->cpu;
719         hdr.max_cpus = ncpus;
720         hdr.len = len;
721         hdr.cl_id = getpid();
722         hdr.buf_size = tip->device->buf_size;
723         hdr.buf_nr = tip->device->buf_nr;
724         hdr.page_size = tip->device->page_size;
725         
726         return write_data_net(net_out_fd[tip->cpu], &hdr, sizeof(hdr));
727 }
728
729 /*
730  * send header with 0 length to signal end-of-run
731  */
732 static void net_client_send_close(void)
733 {
734         struct device_information *dip;
735         struct blktrace_net_hdr hdr;
736         int i;
737
738         for_each_dip(dip, i) {
739                 hdr.magic = BLK_IO_TRACE_MAGIC;
740                 hdr.max_cpus = ncpus;
741                 hdr.len = 0;
742                 strcpy(hdr.buts_name, dip->buts_name);
743                 hdr.cpu = get_dropped_count(dip->buts_name);
744                 hdr.cl_id = getpid();
745                 hdr.buf_size = dip->buf_size;
746                 hdr.buf_nr = dip->buf_nr;
747                 hdr.page_size = dip->page_size;
748
749                 write_data_net(net_out_fd[0], &hdr, sizeof(hdr));
750         }
751
752 }
753
754 static int flush_subbuf_net(struct thread_information *tip,
755                             struct tip_subbuf *ts)
756 {
757         if (net_send_header(tip, ts->len))
758                 return -1;
759         if (write_data_net(net_out_fd[tip->cpu], ts->buf, ts->len))
760                 return -1;
761
762         free(ts->buf);
763         free(ts);
764         return 1;
765 }
766
767 static int net_sendfile(struct thread_information *tip, struct tip_subbuf *ts)
768 {
769         int ret = sendfile(net_out_fd[tip->cpu], tip->fd, NULL, ts->len);
770
771         if (ret < 0) {
772                 perror("sendfile");
773                 return 1;
774         } else if (ret < (int) ts->len) {
775                 fprintf(stderr, "short sendfile send (%d of %d)\n", ret, ts->len);
776                 return 1;
777         }
778
779         return 0;
780 }
781
782 static int flush_subbuf_sendfile(struct thread_information *tip,
783                                  struct tip_subbuf *ts)
784 {
785         int ret = -1;
786
787         if (net_send_header(tip, ts->len))
788                 goto err;
789         if (net_sendfile(tip, ts))
790                 goto err;
791
792         tip->data_read += ts->len;
793         ret = 1;
794 err:
795         free(ts);
796         return ret;
797 }
798
799 static int get_subbuf_sendfile(struct thread_information *tip,
800                                __attribute__((__unused__)) unsigned int maxlen)
801 {
802         struct tip_subbuf *ts;
803         struct stat sb;
804         unsigned int ready;
805
806         wait_for_data(tip, -1);
807
808         if (fstat(tip->fd, &sb) < 0) {
809                 perror("trace stat");
810                 return -1;
811         }
812
813         ready = sb.st_size - tip->data_queued;
814         if (!ready) {
815                 usleep(1000);
816                 return 0;
817         }
818
819         ts = malloc(sizeof(*ts));
820         ts->buf = NULL;
821         ts->max_len = 0;
822         ts->len = ready;
823         tip->data_queued += ready;
824
825         if (flush_subbuf_sendfile(tip, ts) < 0)
826                 return -1;
827
828         return ready;
829 }
830
831 static int write_data(struct thread_information *tip, void *buf,
832                       unsigned int buf_len)
833 {
834         int ret;
835
836         if (!buf_len)
837                 return 0;
838
839         ret = fwrite(buf, buf_len, 1, tip->ofile);
840         if (ferror(tip->ofile) || ret != 1) {
841                 perror("fwrite");
842                 clearerr(tip->ofile);
843                 return 1;
844         }
845
846         if (tip->ofile_stdout)
847                 fflush(tip->ofile);
848
849         return 0;
850 }
851
852 static int flush_subbuf_file(struct thread_information *tip,
853                              struct tip_subbuf *ts)
854 {
855         unsigned int offset = 0;
856         struct blk_io_trace *t;
857         int pdu_len, events = 0;
858
859         /*
860          * surplus from last run
861          */
862         if (tip->leftover_ts) {
863                 struct tip_subbuf *prev_ts = tip->leftover_ts;
864
865                 if (prev_ts->len + ts->len > prev_ts->max_len) {
866                         prev_ts->max_len += ts->len;
867                         prev_ts->buf = realloc(prev_ts->buf, prev_ts->max_len);
868                 }
869
870                 memcpy(prev_ts->buf + prev_ts->len, ts->buf, ts->len);
871                 prev_ts->len += ts->len;
872
873                 free(ts->buf);
874                 free(ts);
875
876                 ts = prev_ts;
877                 tip->leftover_ts = NULL;
878         }
879
880         while (offset + sizeof(*t) <= ts->len) {
881                 t = ts->buf + offset;
882
883                 if (verify_trace(t)) {
884                         write_data(tip, ts->buf, offset);
885                         return -1;
886                 }
887
888                 pdu_len = t->pdu_len;
889
890                 if (offset + sizeof(*t) + pdu_len > ts->len)
891                         break;
892
893                 offset += sizeof(*t) + pdu_len;
894                 tip->events_processed++;
895                 tip->data_read += sizeof(*t) + pdu_len;
896                 events++;
897         }
898
899         if (write_data(tip, ts->buf, offset))
900                 return -1;
901
902         /*
903          * leftover bytes, save them for next time
904          */
905         if (offset != ts->len) {
906                 tip->leftover_ts = ts;
907                 ts->len -= offset;
908                 memmove(ts->buf, ts->buf + offset, ts->len);
909         } else {
910                 free(ts->buf);
911                 free(ts);
912         }
913
914         return events;
915 }
916
917 static int write_tip_events(struct thread_information *tip)
918 {
919         struct tip_subbuf *ts = subbuf_fifo_dequeue(tip);
920
921         if (ts)
922                 return tip->flush_subbuf(tip, ts);
923
924         return 0;
925 }
926
927 /*
928  * scans the tips we know and writes out the subbuffers we accumulate
929  */
930 static void get_and_write_events(void)
931 {
932         struct device_information *dip;
933         struct thread_information *tip;
934         int i, j, events, ret, tips_running;
935
936         while (!is_done()) {
937                 events = 0;
938
939                 for_each_dip(dip, i) {
940                         for_each_tip(dip, tip, j) {
941                                 ret = write_tip_events(tip);
942                                 if (ret > 0)
943                                         events += ret;
944                         }
945                 }
946
947                 if (!events)
948                         usleep(100000);
949         }
950
951         /*
952          * reap stored events
953          */
954         do {
955                 events = 0;
956                 tips_running = 0;
957                 for_each_dip(dip, i) {
958                         for_each_tip(dip, tip, j) {
959                                 ret = write_tip_events(tip);
960                                 if (ret > 0)
961                                         events += ret;
962                                 tips_running += !tip->exited;
963                         }
964                 }
965                 usleep(10);
966         } while (events || tips_running);
967 }
968
969 static void wait_for_threads(void)
970 {
971         /*
972          * for piped or network output, poll and fetch data for writeout.
973          * for files, we just wait around for trace threads to exit
974          */
975         if ((output_name && !strcmp(output_name, "-")) ||
976             ((net_mode == Net_client) && !net_use_sendfile))
977                 get_and_write_events();
978         else {
979                 struct device_information *dip;
980                 struct thread_information *tip;
981                 int i, j, tips_running;
982
983                 do {
984                         tips_running = 0;
985                         usleep(100000);
986
987                         for_each_dip(dip, i)
988                                 for_each_tip(dip, tip, j)
989                                         tips_running += !tip->exited;
990                 } while (tips_running);
991         }
992
993         if (net_mode == Net_client)
994                 net_client_send_close();
995 }
996
997 static int fill_ofname(struct device_information *dip,
998                        struct thread_information *tip, char *dst,
999                        char *buts_name)
1000 {
1001         struct stat sb;
1002         int len = 0;
1003
1004         if (output_dir)
1005                 len = sprintf(dst, "%s/", output_dir);
1006         else
1007                 len = sprintf(dst, "./");
1008
1009         if (net_mode == Net_server) {
1010                 struct net_connection *nc = tip->nc;
1011
1012                 len += sprintf(dst + len, "%s-", inet_ntoa(nc->ch->cl_in_addr));
1013                 len += strftime(dst + len, 64, "%F-%T/", gmtime(&dip->cl_connect_time));
1014         }
1015
1016         if (stat(dst, &sb) < 0) {
1017                 if (errno != ENOENT) {
1018                         perror("stat");
1019                         return 1;
1020                 }
1021                 if (mkdir(dst, 0755) < 0) {
1022                         perror(dst);
1023                         fprintf(stderr, "Can't make output dir\n");
1024                         return 1;
1025                 }
1026         }
1027
1028         if (output_name)
1029                 sprintf(dst + len, "%s.blktrace.%d", output_name, tip->cpu);
1030         else
1031                 sprintf(dst + len, "%s.blktrace.%d", buts_name, tip->cpu);
1032
1033         return 0;
1034 }
1035
1036 static void fill_ops(struct thread_information *tip)
1037 {
1038         /*
1039          * setup ops
1040          */
1041         if (net_mode == Net_client) {
1042                 if (net_use_sendfile) {
1043                         tip->get_subbuf = get_subbuf_sendfile;
1044                         tip->flush_subbuf = NULL;
1045                 } else {
1046                         tip->get_subbuf = get_subbuf;
1047                         tip->flush_subbuf = flush_subbuf_net;
1048                 }
1049         } else {
1050                 if (tip->ofile_mmap)
1051                         tip->get_subbuf = mmap_subbuf;
1052                 else
1053                         tip->get_subbuf = get_subbuf;
1054
1055                 tip->flush_subbuf = flush_subbuf_file;
1056         }
1057                         
1058         if (net_mode == Net_server)
1059                 tip->read_data = read_data_net;
1060         else
1061                 tip->read_data = read_data_file;
1062 }
1063
1064 static int tip_open_output(struct device_information *dip,
1065                            struct thread_information *tip)
1066 {
1067         int pipeline = output_name && !strcmp(output_name, "-");
1068         int mode, vbuf_size;
1069         char op[128];
1070
1071         if (net_mode == Net_client) {
1072                 tip->ofile = NULL;
1073                 tip->ofile_stdout = 0;
1074                 tip->ofile_mmap = 0;
1075                 goto done;
1076         } else if (pipeline) {
1077                 tip->ofile = fdopen(STDOUT_FILENO, "w");
1078                 tip->ofile_stdout = 1;
1079                 tip->ofile_mmap = 0;
1080                 mode = _IOLBF;
1081                 vbuf_size = 512;
1082         } else {
1083                 if (fill_ofname(dip, tip, op, dip->buts_name))
1084                         return 1;
1085                 tip->ofile = fopen(op, "w+");
1086                 tip->ofile_stdout = 0;
1087                 tip->ofile_mmap = 1;
1088                 mode = _IOFBF;
1089                 vbuf_size = OFILE_BUF;
1090         }
1091
1092         if (tip->ofile == NULL) {
1093                 perror(op);
1094                 return 1;
1095         }
1096
1097         tip->ofile_buffer = malloc(vbuf_size);
1098         if (setvbuf(tip->ofile, tip->ofile_buffer, mode, vbuf_size)) {
1099                 perror("setvbuf");
1100                 close_thread(tip);
1101                 return 1;
1102         }
1103
1104 done:
1105         fill_ops(tip);
1106         return 0;
1107 }
1108
1109 static int start_threads(struct device_information *dip)
1110 {
1111         struct thread_information *tip;
1112         int j;
1113
1114         for_each_tip(dip, tip, j) {
1115                 tip->cpu = j;
1116                 tip->device = dip;
1117                 tip->events_processed = 0;
1118                 tip->fd = -1;
1119                 memset(&tip->fifo, 0, sizeof(tip->fifo));
1120                 tip->leftover_ts = NULL;
1121
1122                 if (tip_open_output(dip, tip))
1123                         return 1;
1124
1125                 if (pthread_create(&tip->thread, NULL, thread_main, tip)) {
1126                         perror("pthread_create");
1127                         close_thread(tip);
1128                         return 1;
1129                 }
1130         }
1131
1132         return 0;
1133 }
1134
1135 static void stop_threads(struct device_information *dip)
1136 {
1137         struct thread_information *tip;
1138         unsigned long ret;
1139         int i;
1140
1141         for_each_tip(dip, tip, i) {
1142                 (void) pthread_join(tip->thread, (void *) &ret);
1143                 close_thread(tip);
1144         }
1145 }
1146
1147 static void stop_all_threads(void)
1148 {
1149         struct device_information *dip;
1150         int i;
1151
1152         for_each_dip(dip, i)
1153                 stop_threads(dip);
1154 }
1155
1156 static void stop_all_tracing(void)
1157 {
1158         struct device_information *dip;
1159         int i;
1160
1161         for_each_dip(dip, i)
1162                 stop_trace(dip);
1163 }
1164
1165 static void exit_trace(int status)
1166 {
1167         if (!is_trace_stopped()) {
1168                 trace_stopped = 1;
1169                 stop_all_threads();
1170                 stop_all_tracing();
1171         }
1172
1173         exit(status);
1174 }
1175
1176 static int resize_devices(char *path)
1177 {
1178         int size = (ndevs + 1) * sizeof(struct device_information);
1179
1180         device_information = realloc(device_information, size);
1181         if (!device_information) {
1182                 fprintf(stderr, "Out of memory, device %s (%d)\n", path, size);
1183                 return 1;
1184         }
1185         device_information[ndevs].path = path;
1186         ndevs++;
1187         return 0;
1188 }
1189
1190 static int open_devices(void)
1191 {
1192         struct device_information *dip;
1193         int i;
1194
1195         for_each_dip(dip, i) {
1196                 dip->fd = open(dip->path, O_RDONLY | O_NONBLOCK);
1197                 if (dip->fd < 0) {
1198                         perror(dip->path);
1199                         return 1;
1200                 }
1201                 dip->buf_size = buf_size;
1202                 dip->buf_nr = buf_nr;
1203                 dip->page_size = page_size;
1204         }
1205
1206         return 0;
1207 }
1208
1209 static int start_devices(void)
1210 {
1211         struct device_information *dip;
1212         int i, j, size;
1213
1214         size = ncpus * sizeof(struct thread_information);
1215         thread_information = malloc(size * ndevs);
1216         if (!thread_information) {
1217                 fprintf(stderr, "Out of memory, threads (%d)\n", size * ndevs);
1218                 return 1;
1219         }
1220
1221         for_each_dip(dip, i) {
1222                 if (start_trace(dip)) {
1223                         close(dip->fd);
1224                         fprintf(stderr, "Failed to start trace on %s\n",
1225                                 dip->path);
1226                         break;
1227                 }
1228         }
1229
1230         if (i != ndevs) {
1231                 __for_each_dip(dip, device_information, i, j)
1232                         stop_trace(dip);
1233
1234                 return 1;
1235         }
1236
1237         for_each_dip(dip, i) {
1238                 dip->threads = thread_information + (i * ncpus);
1239                 if (start_threads(dip)) {
1240                         fprintf(stderr, "Failed to start worker threads\n");
1241                         break;
1242                 }
1243         }
1244
1245         if (i != ndevs) {
1246                 __for_each_dip(dip, device_information, i, j)
1247                         stop_threads(dip);
1248                 for_each_dip(dip, i)
1249                         stop_trace(dip);
1250
1251                 return 1;
1252         }
1253
1254         return 0;
1255 }
1256
1257 static void show_stats(struct device_information *dips, int ndips, int cpus)
1258 {
1259         struct device_information *dip;
1260         struct thread_information *tip;
1261         unsigned long long events_processed, data_read;
1262         unsigned long total_drops;
1263         int i, j, no_stdout = 0;
1264
1265         if (is_stat_shown())
1266                 return;
1267
1268         if (output_name && !strcmp(output_name, "-"))
1269                 no_stdout = 1;
1270
1271         stat_shown = 1;
1272
1273         total_drops = 0;
1274         __for_each_dip(dip, dips, ndips, i) {
1275                 if (!no_stdout)
1276                         printf("Device: %s\n", dip->path);
1277                 events_processed = 0;
1278                 data_read = 0;
1279                 __for_each_tip(dip, tip, cpus, j) {
1280                         if (!no_stdout)
1281                                 printf("  CPU%3d: %20lu events, %8llu KiB data\n",
1282                                         tip->cpu, tip->events_processed,
1283                                         (tip->data_read + 1023) >> 10);
1284                         events_processed += tip->events_processed;
1285                         data_read += tip->data_read;
1286                 }
1287                 total_drops += dip->drop_count;
1288                 if (!no_stdout)
1289                         printf("  Total:  %20llu events (dropped %lu), %8llu KiB data\n",
1290                                         events_processed, dip->drop_count,
1291                                         (data_read + 1023) >> 10);
1292         }
1293
1294         if (total_drops)
1295                 fprintf(stderr, "You have dropped events, consider using a larger buffer size (-b)\n");
1296 }
1297
1298 static struct device_information *net_get_dip(struct net_connection *nc,
1299                                               struct blktrace_net_hdr *bnh)
1300 {
1301         struct device_information *dip, *cl_dip = NULL;
1302         struct cl_host *ch = nc->ch;
1303         int i;
1304
1305         for (i = 0; i < ch->ndevs; i++) {
1306                 dip = &ch->device_information[i];
1307
1308                 if (!strcmp(dip->buts_name, bnh->buts_name))
1309                         return dip;
1310
1311                 if (dip->cl_id == bnh->cl_id)
1312                         cl_dip = dip;
1313         }
1314
1315         ch->device_information = realloc(ch->device_information, (ch->ndevs + 1) * sizeof(*dip));
1316         dip = &ch->device_information[ch->ndevs];
1317         memset(dip, 0, sizeof(*dip));
1318         dip->fd = -1;
1319         dip->ch = ch;
1320         dip->cl_id = bnh->cl_id;
1321         dip->buf_size = bnh->buf_size;
1322         dip->buf_nr = bnh->buf_nr;
1323         dip->page_size = bnh->page_size;
1324
1325         if (cl_dip)
1326                 dip->cl_connect_time = cl_dip->cl_connect_time;
1327         else
1328                 dip->cl_connect_time = nc->connect_time;
1329         strcpy(dip->buts_name, bnh->buts_name);
1330         dip->path = strdup(bnh->buts_name);
1331         dip->trace_started = 1;
1332         ch->ndevs++;
1333         dip->threads = malloc(nc->ncpus * sizeof(struct thread_information));
1334         memset(dip->threads, 0, nc->ncpus * sizeof(struct thread_information));
1335
1336         /*
1337          * open all files
1338          */
1339         for (i = 0; i < nc->ncpus; i++) {
1340                 struct thread_information *tip = &dip->threads[i];
1341
1342                 tip->cpu = i;
1343                 tip->device = dip;
1344                 tip->fd = -1;
1345                 tip->nc = nc;
1346                 
1347                 if (tip_open_output(dip, tip))
1348                         return NULL;
1349
1350                 tip->nc = NULL;
1351         }
1352
1353         return dip;
1354 }
1355
1356 static struct thread_information *net_get_tip(struct net_connection *nc,
1357                                               struct blktrace_net_hdr *bnh)
1358 {
1359         struct device_information *dip;
1360         struct thread_information *tip;
1361
1362         dip = net_get_dip(nc, bnh);
1363         if (!dip->trace_started) {
1364                 fprintf(stderr, "Events for closed devices %s\n", dip->buts_name);
1365                 return NULL;
1366         }
1367
1368         tip = &dip->threads[bnh->cpu];
1369         if (!tip->nc)
1370                 tip->nc = nc;
1371         
1372         return tip;
1373 }
1374
1375 static int net_get_header(struct net_connection *nc,
1376                           struct blktrace_net_hdr *bnh)
1377 {
1378         int fl = fcntl(nc->in_fd, F_GETFL);
1379         int bytes_left, ret;
1380         void *p = bnh;
1381
1382         fcntl(nc->in_fd, F_SETFL, fl | O_NONBLOCK);
1383         bytes_left = sizeof(*bnh);
1384         while (bytes_left && !is_done()) {
1385                 ret = recv(nc->in_fd, p, bytes_left, MSG_WAITALL);
1386                 if (ret < 0) {
1387                         if (errno != EAGAIN) {
1388                                 perror("recv header");
1389                                 return 1;
1390                         }
1391                         usleep(1000);
1392                         continue;
1393                 } else if (!ret) {
1394                         usleep(1000);
1395                         continue;
1396                 } else {
1397                         p += ret;
1398                         bytes_left -= ret;
1399                 }
1400         }
1401         fcntl(nc->in_fd, F_SETFL, fl & ~O_NONBLOCK);
1402         return bytes_left;
1403 }
1404
1405 /*
1406  * finalize a net client: truncate files, show stats, cleanup, etc
1407  */
1408 static void device_done(struct net_connection *nc, struct device_information *dip)
1409 {
1410         struct thread_information *tip;
1411         int i;
1412
1413         __for_each_tip(dip, tip, nc->ncpus, i)
1414                 tip_ftrunc_final(tip);
1415
1416         show_stats(dip, 1, nc->ncpus);
1417
1418         /*
1419          * cleanup for next run
1420          */
1421         __for_each_tip(dip, tip, nc->ncpus, i) {
1422                 if (tip->ofile)
1423                         fclose(tip->ofile);
1424         }
1425
1426         free(dip->threads);
1427         free(dip->path);
1428
1429         close(nc->in_fd);
1430         nc->in_fd = -1;
1431
1432         stat_shown = 0;
1433 }
1434
1435 static inline int in_addr_eq(struct in_addr a, struct in_addr b)
1436 {
1437         return a.s_addr == b.s_addr;
1438 }
1439
1440 static void net_add_client_host(struct cl_host *ch)
1441 {
1442         ch->list_next = cl_host_list;
1443         cl_host_list = ch;
1444         cl_hosts++;
1445 }
1446
1447 static void net_remove_client_host(struct cl_host *ch)
1448 {
1449         struct cl_host *p, *c;
1450         
1451         for (p = c = cl_host_list; c; c = c->list_next) {
1452                 if (c == ch) {
1453                         if (p == c)
1454                                 cl_host_list = c->list_next;
1455                         else
1456                                 p->list_next = c->list_next;
1457                         cl_hosts--;
1458                         return;
1459                 }
1460                 p = c;
1461         }
1462 }
1463
1464 static struct cl_host *net_find_client_host(struct in_addr cl_in_addr)
1465 {
1466         struct cl_host *ch = cl_host_list;
1467
1468         while (ch) {
1469                 if (in_addr_eq(ch->cl_in_addr, cl_in_addr))
1470                         return ch;
1471                 ch = ch->list_next;
1472         }
1473
1474         return NULL;
1475 }
1476
1477 static void net_client_host_done(struct cl_host *ch)
1478 {
1479         free(ch->device_information);
1480         free(ch->net_connections);
1481         net_connects -= ch->nconn;
1482         net_remove_client_host(ch);
1483         free(ch);
1484 }
1485
1486 /*
1487  * handle incoming events from a net client
1488  */
1489 static int net_client_data(struct net_connection *nc)
1490 {
1491         struct thread_information *tip;
1492         struct blktrace_net_hdr bnh;
1493
1494         if (net_get_header(nc, &bnh))
1495                 return 1;
1496
1497         if (data_is_native == -1 && check_data_endianness(bnh.magic)) {
1498                 fprintf(stderr, "server: received data is bad\n");
1499                 return 1;
1500         }
1501
1502         if (!data_is_native) {
1503                 bnh.magic = be32_to_cpu(bnh.magic);
1504                 bnh.cpu = be32_to_cpu(bnh.cpu);
1505                 bnh.max_cpus = be32_to_cpu(bnh.max_cpus);
1506                 bnh.len = be32_to_cpu(bnh.len);
1507                 bnh.cl_id = be32_to_cpu(bnh.cl_id);
1508                 bnh.buf_size = be32_to_cpu(bnh.buf_size);
1509                 bnh.buf_nr = be32_to_cpu(bnh.buf_nr);
1510                 bnh.page_size = be32_to_cpu(bnh.page_size);
1511         }
1512
1513         if ((bnh.magic & 0xffffff00) != BLK_IO_TRACE_MAGIC) {
1514                 fprintf(stderr, "server: bad data magic\n");
1515                 return 1;
1516         }
1517
1518         if (nc->ncpus == -1)
1519                 nc->ncpus = bnh.max_cpus;
1520
1521         /*
1522          * len == 0 means that the other end signalled end-of-run
1523          */
1524         if (!bnh.len) {
1525                 /*
1526                  * overload cpu count with dropped events
1527                  */
1528                 struct device_information *dip;
1529
1530                 dip = net_get_dip(nc, &bnh);
1531                 dip->drop_count = bnh.cpu;
1532                 dip->trace_started = 0;
1533
1534                 printf("server: end of run for %s\n", dip->buts_name);
1535
1536                 device_done(nc, dip);
1537
1538                 if (++nc->ch->ndevs_done == nc->ch->ndevs)
1539                         net_client_host_done(nc->ch);
1540
1541                 return 0;
1542         }
1543
1544         tip = net_get_tip(nc, &bnh);
1545         if (!tip)
1546                 return 1;
1547
1548         if (mmap_subbuf(tip, bnh.len))
1549                 return 1;
1550
1551         return 0;
1552 }
1553
1554 static void net_add_connection(int listen_fd, struct sockaddr_in *addr)
1555 {
1556         socklen_t socklen = sizeof(*addr);
1557         struct net_connection *nc;
1558         struct cl_host *ch;
1559         int in_fd;
1560
1561         in_fd = accept(listen_fd, (struct sockaddr *) addr, &socklen);
1562         if (in_fd < 0) {
1563                 perror("accept");
1564                 return;
1565         }
1566
1567         ch = net_find_client_host(addr->sin_addr);
1568         if (!ch) {
1569                 if (cl_hosts == NET_MAX_CL_HOSTS) {
1570                         fprintf(stderr, "server: no more clients allowed\n");
1571                         return;
1572                 }
1573                 ch = malloc(sizeof(struct cl_host));
1574                 memset(ch, 0, sizeof(*ch));
1575                 ch->cl_in_addr = addr->sin_addr;
1576                 net_add_client_host(ch);
1577
1578                 printf("server: connection from %s\n", inet_ntoa(addr->sin_addr));
1579         }
1580
1581         ch->net_connections = realloc(ch->net_connections, (ch->nconn + 1) * sizeof(*nc));
1582         nc = &ch->net_connections[ch->nconn++];
1583         memset(nc, 0, sizeof(*nc));
1584
1585         time(&nc->connect_time);
1586         nc->ch = ch;
1587         nc->in_fd = in_fd;
1588         nc->ncpus = -1;
1589         net_connects++;
1590 }
1591
1592 /*
1593  * event driven loop, handle new incoming connections and data from
1594  * existing connections
1595  */
1596 static void net_server_handle_connections(int listen_fd,
1597                                           struct sockaddr_in *addr)
1598 {
1599         struct pollfd *pfds = NULL;
1600         struct net_connection **ncs = NULL;
1601         int max_connects = 0;
1602         int i, nconns, events;
1603         struct cl_host *ch;
1604         struct net_connection *nc;
1605         
1606         printf("server: waiting for connections...\n");
1607
1608         while (!is_done()) {
1609                 if (net_connects >= max_connects) {
1610                         pfds = realloc(pfds, (net_connects + 1) * sizeof(*pfds));
1611                         ncs = realloc(ncs, (net_connects + 1) * sizeof(*ncs));
1612                         max_connects = net_connects + 1;
1613                 }
1614                 /*
1615                  * the zero entry is for incoming connections, remaining
1616                  * entries for clients
1617                  */
1618                 pfds[0].fd = listen_fd;
1619                 pfds[0].events = POLLIN;
1620                 nconns = 0;
1621                 for_each_cl_host(ch) {
1622                         for (i = 0; i < ch->nconn; i++) {
1623                                 nc = &ch->net_connections[i];
1624                                 pfds[nconns + 1].fd = nc->in_fd;
1625                                 pfds[nconns + 1].events = POLLIN;
1626                                 ncs[nconns++] = nc;
1627                         }
1628                 }
1629
1630                 events = poll(pfds, 1 + nconns, -1);
1631                 if (events < 0) {
1632                         if (errno == EINTR)
1633                                 continue;
1634
1635                         perror("poll");
1636                         break;
1637                 } else if (!events)
1638                         continue;
1639
1640                 if (pfds[0].revents & POLLIN) {
1641                         net_add_connection(listen_fd, addr);
1642                         events--;
1643                 }
1644
1645                 for (i = 0; events && i < nconns; i++) {
1646                         if (pfds[i + 1].revents & POLLIN) {
1647                                 net_client_data(ncs[i]);
1648                                 events--;
1649                         }
1650                 }
1651         }
1652 }
1653
1654 /*
1655  * Start here when we are in server mode - just fetch data from the network
1656  * and dump to files
1657  */
1658 static int net_server(void)
1659 {
1660         struct sockaddr_in addr;
1661         int fd, opt;
1662
1663         fd = socket(AF_INET, SOCK_STREAM, 0);
1664         if (fd < 0) {
1665                 perror("server: socket");
1666                 return 1;
1667         }
1668
1669         opt = 1;
1670         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
1671                 perror("setsockopt");
1672                 return 1;
1673         }
1674
1675         memset(&addr, 0, sizeof(addr));
1676         addr.sin_family = AF_INET;
1677         addr.sin_addr.s_addr = htonl(INADDR_ANY);
1678         addr.sin_port = htons(net_port);
1679
1680         if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
1681                 perror("bind");
1682                 return 1;
1683         }
1684
1685         if (listen(fd, 1) < 0) {
1686                 perror("listen");
1687                 return 1;
1688         }
1689
1690         net_server_handle_connections(fd, &addr);
1691         return 0;
1692 }
1693
1694 /*
1695  * Setup outgoing network connection where we will transmit data
1696  */
1697 static int net_setup_client_cpu(int i, struct sockaddr_in *addr)
1698 {
1699         int fd;
1700
1701         fd = socket(AF_INET, SOCK_STREAM, 0);
1702         if (fd < 0) {
1703                 perror("client: socket");
1704                 return 1;
1705         }
1706
1707         if (connect(fd, (struct sockaddr *) addr, sizeof(*addr)) < 0) {
1708                 perror("client: connect");
1709                 return 1;
1710         }
1711
1712         net_out_fd[i] = fd;
1713         return 0;
1714 }
1715
1716 static int net_setup_client(void)
1717 {
1718         struct sockaddr_in addr;
1719         int i;
1720
1721         memset(&addr, 0, sizeof(addr));
1722         addr.sin_family = AF_INET;
1723         addr.sin_port = htons(net_port);
1724
1725         if (inet_aton(hostname, &addr.sin_addr) != 1) {
1726                 struct hostent *hent = gethostbyname(hostname);
1727                 if (!hent) {
1728                         perror("gethostbyname");
1729                         return 1;
1730                 }
1731
1732                 memcpy(&addr.sin_addr, hent->h_addr, 4);
1733                 strcpy(hostname, hent->h_name);
1734         }
1735
1736         printf("blktrace: connecting to %s\n", hostname);
1737
1738         net_out_fd = malloc(ncpus * sizeof(*net_out_fd));
1739         for (i = 0; i < ncpus; i++) {
1740                 if (net_setup_client_cpu(i, &addr))
1741                         return 1;
1742         }
1743
1744         printf("blktrace: connected!\n");
1745         
1746         return 0;
1747 }
1748
1749 static char usage_str[] = \
1750         "-d <dev> [ -r debugfs path ] [ -o <output> ] [-k ] [ -w time ]\n" \
1751         "[ -a action ] [ -A action mask ] [ -v ]\n\n" \
1752         "\t-d Use specified device. May also be given last after options\n" \
1753         "\t-r Path to mounted debugfs, defaults to /debug\n" \
1754         "\t-o File(s) to send output to\n" \
1755         "\t-D Directory to prepend to output file names\n" \
1756         "\t-k Kill a running trace\n" \
1757         "\t-w Stop after defined time, in seconds\n" \
1758         "\t-a Only trace specified actions. See documentation\n" \
1759         "\t-A Give trace mask as a single value. See documentation\n" \
1760         "\t-b Sub buffer size in KiB\n" \
1761         "\t-n Number of sub buffers\n" \
1762         "\t-l Run in network listen mode (blktrace server)\n" \
1763         "\t-h Run in network client mode, connecting to the given host\n" \
1764         "\t-p Network port to use (default 8462)\n" \
1765         "\t-s Make the network client NOT use sendfile() to transfer data\n" \
1766         "\t-V Print program version info\n\n";
1767
1768 static void show_usage(char *program)
1769 {
1770         fprintf(stderr, "Usage: %s %s %s",program, blktrace_version, usage_str);
1771 }
1772
1773 int main(int argc, char *argv[])
1774 {
1775         static char default_debugfs_path[] = "/debug";
1776         struct statfs st;
1777         int i, c;
1778         int stop_watch = 0;
1779         int act_mask_tmp = 0;
1780
1781         while ((c = getopt_long(argc, argv, S_OPTS, l_opts, NULL)) >= 0) {
1782                 switch (c) {
1783                 case 'a':
1784                         i = find_mask_map(optarg);
1785                         if (i < 0) {
1786                                 fprintf(stderr,"Invalid action mask %s\n",
1787                                         optarg);
1788                                 return 1;
1789                         }
1790                         act_mask_tmp |= i;
1791                         break;
1792
1793                 case 'A':
1794                         if ((sscanf(optarg, "%x", &i) != 1) || 
1795                                                         !valid_act_opt(i)) {
1796                                 fprintf(stderr,
1797                                         "Invalid set action mask %s/0x%x\n",
1798                                         optarg, i);
1799                                 return 1;
1800                         }
1801                         act_mask_tmp = i;
1802                         break;
1803
1804                 case 'd':
1805                         if (resize_devices(optarg) != 0)
1806                                 return 1;
1807                         break;
1808
1809                 case 'r':
1810                         debugfs_path = optarg;
1811                         break;
1812
1813                 case 'o':
1814                         output_name = optarg;
1815                         break;
1816                 case 'k':
1817                         kill_running_trace = 1;
1818                         break;
1819                 case 'w':
1820                         stop_watch = atoi(optarg);
1821                         if (stop_watch <= 0) {
1822                                 fprintf(stderr,
1823                                         "Invalid stopwatch value (%d secs)\n",
1824                                         stop_watch);
1825                                 return 1;
1826                         }
1827                         break;
1828                 case 'V':
1829                         printf("%s version %s\n", argv[0], blktrace_version);
1830                         return 0;
1831                 case 'b':
1832                         buf_size = strtoul(optarg, NULL, 10);
1833                         if (buf_size <= 0 || buf_size > 16*1024) {
1834                                 fprintf(stderr,
1835                                         "Invalid buffer size (%lu)\n",buf_size);
1836                                 return 1;
1837                         }
1838                         buf_size <<= 10;
1839                         break;
1840                 case 'n':
1841                         buf_nr = strtoul(optarg, NULL, 10);
1842                         if (buf_nr <= 0) {
1843                                 fprintf(stderr,
1844                                         "Invalid buffer nr (%lu)\n", buf_nr);
1845                                 return 1;
1846                         }
1847                         break;
1848                 case 'D':
1849                         output_dir = optarg;
1850                         break;
1851                 case 'h':
1852                         net_mode = Net_client;
1853                         strcpy(hostname, optarg);
1854                         break;
1855                 case 'l':
1856                         net_mode = Net_server;
1857                         break;
1858                 case 'p':
1859                         net_port = atoi(optarg);
1860                         break;
1861                 case 's':
1862                         net_use_sendfile = 0;
1863                         break;
1864                 default:
1865                         show_usage(argv[0]);
1866                         return 1;
1867                 }
1868         }
1869
1870         setlocale(LC_NUMERIC, "en_US");
1871
1872         page_size = getpagesize();
1873
1874         if (net_mode == Net_server)
1875                 return net_server();
1876
1877         while (optind < argc) {
1878                 if (resize_devices(argv[optind++]) != 0)
1879                         return 1;
1880         }
1881
1882         if (ndevs == 0) {
1883                 show_usage(argv[0]);
1884                 return 1;
1885         }
1886
1887         if (act_mask_tmp != 0)
1888                 act_mask = act_mask_tmp;
1889
1890         if (!debugfs_path)
1891                 debugfs_path = default_debugfs_path;
1892
1893         if (statfs(debugfs_path, &st) < 0) {
1894                 perror("statfs");
1895                 fprintf(stderr,"%s does not appear to be a valid path\n",
1896                         debugfs_path);
1897                 return 1;
1898         } else if (st.f_type != (long) DEBUGFS_TYPE) {
1899                 fprintf(stderr,"%s does not appear to be a debug filesystem\n",
1900                         debugfs_path);
1901                 return 1;
1902         }
1903
1904         if (open_devices() != 0)
1905                 return 1;
1906
1907         if (kill_running_trace) {
1908                 stop_all_traces();
1909                 return 0;
1910         }
1911
1912         ncpus = sysconf(_SC_NPROCESSORS_ONLN);
1913         if (ncpus < 0) {
1914                 fprintf(stderr, "sysconf(_SC_NPROCESSORS_ONLN) failed\n");
1915                 return 1;
1916         }
1917
1918         signal(SIGINT, handle_sigint);
1919         signal(SIGHUP, handle_sigint);
1920         signal(SIGTERM, handle_sigint);
1921         signal(SIGALRM, handle_sigint);
1922
1923         if (net_mode == Net_client && net_setup_client())
1924                 return 1;
1925
1926         if (start_devices() != 0)
1927                 return 1;
1928
1929         atexit(stop_all_tracing);
1930
1931         if (stop_watch)
1932                 alarm(stop_watch);
1933
1934         wait_for_threads();
1935
1936         if (!is_trace_stopped()) {
1937                 trace_stopped = 1;
1938                 stop_all_threads();
1939                 stop_all_traces();
1940         }
1941
1942         show_stats(device_information, ndevs, ncpus);
1943
1944         return 0;
1945 }
1946