[PATCH] blktrace: add ->ops for the read/get/flush methods
[blktrace.git] / blktrace.c
1 /*
2  * block queue tracing application
3  *
4  * Copyright (C) 2005 Jens Axboe <axboe@suse.de>
5  *
6  *  This program is free software; you can redistribute it and/or modify
7  *  it under the terms of the GNU General Public License as published by
8  *  the Free Software Foundation; either version 2 of the License, or
9  *  (at your option) any later version.
10  *
11  *  This program is distributed in the hope that it will be useful,
12  *  but WITHOUT ANY WARRANTY; without even the implied warranty of
13  *  MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
14  *  GNU General Public License for more details.
15  *
16  *  You should have received a copy of the GNU General Public License
17  *  along with this program; if not, write to the Free Software
18  *  Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA  02111-1307  USA
19  *
20  */
21 #include <pthread.h>
22 #include <sys/types.h>
23 #include <sys/stat.h>
24 #include <unistd.h>
25 #include <locale.h>
26 #include <signal.h>
27 #include <fcntl.h>
28 #include <string.h>
29 #include <sys/ioctl.h>
30 #include <sys/param.h>
31 #include <sys/statfs.h>
32 #include <sys/poll.h>
33 #include <sys/mman.h>
34 #include <sys/socket.h>
35 #include <stdio.h>
36 #include <stdlib.h>
37 #include <sched.h>
38 #include <ctype.h>
39 #include <getopt.h>
40 #include <errno.h>
41 #include <netinet/in.h>
42 #include <arpa/inet.h>
43 #include <netdb.h>
44
45 #include "blktrace.h"
46 #include "barrier.h"
47
48 static char blktrace_version[] = "0.99";
49
50 /*
51  * You may want to increase this even more, if you are logging at a high
52  * rate and see skipped/missed events
53  */
54 #define BUF_SIZE        (512 * 1024)
55 #define BUF_NR          (4)
56
57 #define OFILE_BUF       (128 * 1024)
58
59 #define RELAYFS_TYPE    0xF0B4A981
60
61 #define S_OPTS  "d:a:A:r:o:kw:Vb:n:D:lh:p:"
62 static struct option l_opts[] = {
63         {
64                 .name = "dev",
65                 .has_arg = required_argument,
66                 .flag = NULL,
67                 .val = 'd'
68         },
69         {
70                 .name = "act-mask",
71                 .has_arg = required_argument,
72                 .flag = NULL,
73                 .val = 'a'
74         },
75         {
76                 .name = "set-mask",
77                 .has_arg = required_argument,
78                 .flag = NULL,
79                 .val = 'A'
80         },
81         {
82                 .name = "relay",
83                 .has_arg = required_argument,
84                 .flag = NULL,
85                 .val = 'r'
86         },
87         {
88                 .name = "output",
89                 .has_arg = required_argument,
90                 .flag = NULL,
91                 .val = 'o'
92         },
93         {
94                 .name = "kill",
95                 .has_arg = no_argument,
96                 .flag = NULL,
97                 .val = 'k'
98         },
99         {
100                 .name = "stopwatch",
101                 .has_arg = required_argument,
102                 .flag = NULL,
103                 .val = 'w'
104         },
105         {
106                 .name = "version",
107                 .has_arg = no_argument,
108                 .flag = NULL,
109                 .val = 'V'
110         },
111         {
112                 .name = "buffer-size",
113                 .has_arg = required_argument,
114                 .flag = NULL,
115                 .val = 'b'
116         },
117         {
118                 .name = "num-sub-buffers",
119                 .has_arg = required_argument,
120                 .flag = NULL,
121                 .val = 'n'
122         },
123         {
124                 .name = "output-dir",
125                 .has_arg = required_argument,
126                 .flag = NULL,
127                 .val = 'D'
128         },
129         {
130                 .name = "listen",
131                 .has_arg = no_argument,
132                 .flag = NULL,
133                 .val = 'l'
134         },
135         {
136                 .name = "host",
137                 .has_arg = required_argument,
138                 .flag = NULL,
139                 .val = 'h'
140         },
141         {
142                 .name = "port",
143                 .has_arg = required_argument,
144                 .flag = NULL,
145                 .val = 'p'
146         },
147         {
148                 .name = NULL,
149         }
150 };
151
152 struct tip_subbuf {
153         void *buf;
154         unsigned int len;
155         unsigned int max_len;
156 };
157
158 #define FIFO_SIZE       (1024)  /* should be plenty big! */
159 #define CL_SIZE         (128)   /* cache line, any bigger? */
160
161 struct tip_subbuf_fifo {
162         int tail __attribute__((aligned(CL_SIZE)));
163         int head __attribute__((aligned(CL_SIZE)));
164         struct tip_subbuf *q[FIFO_SIZE];
165 };
166
167 struct thread_information {
168         int cpu;
169         pthread_t thread;
170
171         int fd;
172         void *fd_buf;
173         char fn[MAXPATHLEN + 64];
174
175         FILE *ofile;
176         char *ofile_buffer;
177         int ofile_stdout;
178         int ofile_mmap;
179
180         int (*get_subbuf)(struct thread_information *, unsigned int);
181         int (*flush_subbuf)(struct thread_information *, struct tip_subbuf *);
182         int (*read_data)(struct thread_information *, void *, unsigned int);
183
184         unsigned long events_processed;
185         unsigned long long data_read;
186         struct device_information *device;
187
188         int exited;
189
190         /*
191          * piped fifo buffers
192          */
193         struct tip_subbuf_fifo fifo;
194         struct tip_subbuf *leftover_ts;
195
196         /*
197          * mmap controlled output files
198          */
199         unsigned long long fs_size;
200         unsigned long long fs_max_size;
201         unsigned long fs_off;
202         void *fs_buf;
203         unsigned long fs_buf_len;
204 };
205
206 struct device_information {
207         int fd;
208         char *path;
209         char buts_name[32];
210         volatile int trace_started;
211         unsigned long drop_count;
212         struct thread_information *threads;
213 };
214
215 static int ncpus;
216 static struct thread_information *thread_information;
217 static int ndevs;
218 static struct device_information *device_information;
219
220 /* command line option globals */
221 static char *relay_path;
222 static char *output_name;
223 static char *output_dir;
224 static int act_mask = ~0U;
225 static int kill_running_trace;
226 static unsigned long buf_size = BUF_SIZE;
227 static unsigned long buf_nr = BUF_NR;
228 static unsigned int page_size;
229
230 #define is_done()       (*(volatile int *)(&done))
231 static volatile int done;
232
233 #define is_trace_stopped()      (*(volatile int *)(&trace_stopped))
234 static volatile int trace_stopped;
235
236 #define is_stat_shown() (*(volatile int *)(&stat_shown))
237 static volatile int stat_shown;
238
239 int data_is_native = -1;
240
241 static void exit_trace(int status);
242
243 #define dip_tracing(dip)        (*(volatile int *)(&(dip)->trace_started))
244 #define dip_set_tracing(dip, v) ((dip)->trace_started = (v))
245
246 #define __for_each_dip(__d, __i, __e)   \
247         for (__i = 0, __d = device_information; __i < __e; __i++, __d++)
248
249 #define for_each_dip(__d, __i)  __for_each_dip(__d, __i, ndevs)
250 #define for_each_tip(__d, __t, __j)     \
251         for (__j = 0, __t = (__d)->threads; __j < ncpus; __j++, __t++)
252
253 /*
254  * networking stuff follows. we include a magic number so we know whether
255  * to endianness convert or not
256  */
257 struct blktrace_net_hdr {
258         u32 magic;              /* same as trace magic */
259         char buts_name[32];     /* trace name */
260         u32 cpu;                /* for which cpu */
261         u32 max_cpus;
262         u32 len;                /* length of following trace data */
263 };
264
265 #define TRACE_NET_PORT          (8462)
266
267 enum {
268         Net_none = 0,
269         Net_server,
270         Net_client,
271 };
272
273 /*
274  * network cmd line params
275  */
276 static char hostname[MAXHOSTNAMELEN];
277 static int net_port = TRACE_NET_PORT;
278 static int net_mode = 0;
279
280 static int net_in_fd = -1;
281 static int net_out_fd = -1;
282
283 static void handle_sigint(__attribute__((__unused__)) int sig)
284 {
285         done = 1;
286 }
287
288 static int get_dropped_count(const char *buts_name)
289 {
290         int fd;
291         char tmp[MAXPATHLEN + 64];
292
293         snprintf(tmp, sizeof(tmp), "%s/block/%s/dropped",
294                  relay_path, buts_name);
295
296         fd = open(tmp, O_RDONLY);
297         if (fd < 0) {
298                 /*
299                  * this may be ok, if the kernel doesn't support dropped counts
300                  */
301                 if (errno == ENOENT)
302                         return 0;
303
304                 fprintf(stderr, "Couldn't open dropped file %s\n", tmp);
305                 return -1;
306         }
307
308         if (read(fd, tmp, sizeof(tmp)) < 0) {
309                 perror(tmp);
310                 close(fd);
311                 return -1;
312         }
313
314         close(fd);
315
316         return atoi(tmp);
317 }
318
319 static int start_trace(struct device_information *dip)
320 {
321         struct blk_user_trace_setup buts;
322
323         memset(&buts, 0, sizeof(buts));
324         buts.buf_size = buf_size;
325         buts.buf_nr = buf_nr;
326         buts.act_mask = act_mask;
327
328         if (ioctl(dip->fd, BLKTRACESETUP, &buts) < 0) {
329                 perror("BLKTRACESETUP");
330                 return 1;
331         }
332
333         if (ioctl(dip->fd, BLKTRACESTART) < 0) {
334                 perror("BLKTRACESTART");
335                 return 1;
336         }
337
338         memcpy(dip->buts_name, buts.name, sizeof(dip->buts_name));
339         dip_set_tracing(dip, 1);
340         return 0;
341 }
342
343 static void stop_trace(struct device_information *dip)
344 {
345         if (dip_tracing(dip) || kill_running_trace) {
346                 dip_set_tracing(dip, 0);
347
348                 if (ioctl(dip->fd, BLKTRACESTOP) < 0)
349                         perror("BLKTRACESTOP");
350                 if (ioctl(dip->fd, BLKTRACETEARDOWN) < 0)
351                         perror("BLKTRACETEARDOWN");
352
353                 close(dip->fd);
354                 dip->fd = -1;
355         }
356 }
357
358 static void stop_all_traces(void)
359 {
360         struct device_information *dip;
361         int i;
362
363         for_each_dip(dip, i) {
364                 dip->drop_count = get_dropped_count(dip->buts_name);
365                 stop_trace(dip);
366         }
367 }
368
369 static void wait_for_data(struct thread_information *tip)
370 {
371         struct pollfd pfd = { .fd = tip->fd, .events = POLLIN };
372
373         do {
374                 poll(&pfd, 1, 100);
375                 if (pfd.revents & POLLIN)
376                         break;
377                 if (tip->ofile_stdout)
378                         break;
379         } while (!is_done());
380 }
381
382 static int read_data_file(struct thread_information *tip, void *buf,
383                           unsigned int len)
384 {
385         int ret = 0;
386
387         do {
388                 wait_for_data(tip);
389
390                 ret = read(tip->fd, buf, len);
391                 if (!ret)
392                         continue;
393                 else if (ret > 0)
394                         return ret;
395                 else {
396                         if (errno != EAGAIN) {
397                                 perror(tip->fn);
398                                 fprintf(stderr,"Thread %d failed read of %s\n",
399                                         tip->cpu, tip->fn);
400                                 break;
401                         }
402                         continue;
403                 }
404         } while (!is_done());
405
406         return ret;
407
408 }
409
410 static int read_data_net(struct thread_information *tip, void *buf,
411                          unsigned int len)
412 {
413         unsigned int bytes_left = len;
414         int ret = 0;
415
416         do {
417                 ret = recv(net_in_fd, buf, bytes_left, MSG_WAITALL);
418
419                 if (!ret)
420                         continue;
421                 else if (ret < 0) {
422                         if (errno != EAGAIN) {
423                                 perror(tip->fn);
424                                 fprintf(stderr, "server: failed read\n");
425                                 return 0;
426                         }
427                         continue;
428                 } else {
429                         buf += ret;
430                         bytes_left -= ret;
431                 }
432         } while (!is_done() && bytes_left);
433
434         return len - bytes_left;
435 }
436
437 static int read_data(struct thread_information *tip, void *buf,
438                      unsigned int len)
439 {
440         int ret = tip->read_data(tip, buf, len);
441
442         if (ret > 0)
443                 tip->data_read += ret;
444
445         return ret;
446 }
447
448 static inline struct tip_subbuf *
449 subbuf_fifo_dequeue(struct thread_information *tip)
450 {
451         const int head = tip->fifo.head;
452         const int next = (head + 1) & (FIFO_SIZE - 1);
453
454         if (head != tip->fifo.tail) {
455                 struct tip_subbuf *ts = tip->fifo.q[head];
456
457                 store_barrier();
458                 tip->fifo.head = next;
459                 return ts;
460         }
461
462         return NULL;
463 }
464
465 static inline int subbuf_fifo_queue(struct thread_information *tip,
466                                     struct tip_subbuf *ts)
467 {
468         const int tail = tip->fifo.tail;
469         const int next = (tail + 1) & (FIFO_SIZE - 1);
470
471         if (next != tip->fifo.head) {
472                 tip->fifo.q[tail] = ts;
473                 store_barrier();
474                 tip->fifo.tail = next;
475                 return 0;
476         }
477
478         fprintf(stderr, "fifo too small!\n");
479         return 1;
480 }
481
482 /*
483  * For file output, truncate and mmap the file appropriately
484  */
485 static int mmap_subbuf(struct thread_information *tip, unsigned int maxlen)
486 {
487         int ofd = fileno(tip->ofile);
488         int ret;
489
490         /*
491          * extend file, if we have to. use chunks of 16 subbuffers.
492          */
493         if (tip->fs_off + buf_size > tip->fs_buf_len) {
494                 if (tip->fs_buf) {
495                         munlock(tip->fs_buf, tip->fs_buf_len);
496                         munmap(tip->fs_buf, tip->fs_buf_len);
497                         tip->fs_buf = NULL;
498                 }
499
500                 tip->fs_off = tip->fs_size & (page_size - 1);
501                 tip->fs_buf_len = (16 * buf_size) - tip->fs_off;
502                 tip->fs_max_size += tip->fs_buf_len;
503
504                 if (ftruncate(ofd, tip->fs_max_size) < 0) {
505                         perror("ftruncate");
506                         return -1;
507                 }
508
509                 tip->fs_buf = mmap(NULL, tip->fs_buf_len, PROT_WRITE,
510                                    MAP_SHARED, ofd, tip->fs_size - tip->fs_off);
511                 if (tip->fs_buf == MAP_FAILED) {
512                         perror("mmap");
513                         return -1;
514                 }
515                 mlock(tip->fs_buf, tip->fs_buf_len);
516         }
517
518         ret = read_data(tip, tip->fs_buf + tip->fs_off, maxlen);
519         if (ret >= 0) {
520                 tip->fs_size += ret;
521                 tip->fs_off += ret;
522                 return 0;
523         }
524
525         return -1;
526 }
527
528 /*
529  * Use the copy approach for pipes and network
530  */
531 static int get_subbuf(struct thread_information *tip, unsigned int maxlen)
532 {
533         struct tip_subbuf *ts = malloc(sizeof(*ts));
534         int ret;
535
536         ts->buf = malloc(buf_size);
537         ts->max_len = maxlen;
538
539         ret = read_data(tip, ts->buf, ts->max_len);
540         if (ret > 0) {
541                 ts->len = ret;
542                 return subbuf_fifo_queue(tip, ts);
543         }
544
545         return ret;
546 }
547
548 static void close_thread(struct thread_information *tip)
549 {
550         if (tip->fd != -1)
551                 close(tip->fd);
552         if (tip->ofile)
553                 fclose(tip->ofile);
554         if (tip->ofile_buffer)
555                 free(tip->ofile_buffer);
556         if (tip->fd_buf)
557                 free(tip->fd_buf);
558
559         tip->fd = -1;
560         tip->ofile = NULL;
561         tip->ofile_buffer = NULL;
562         tip->fd_buf = NULL;
563 }
564
565 static void tip_ftrunc_final(struct thread_information *tip)
566 {
567         /*
568          * truncate to right size and cleanup mmap
569          */
570         if (tip->ofile_mmap) {
571                 int ofd = fileno(tip->ofile);
572
573                 if (tip->fs_buf)
574                         munmap(tip->fs_buf, tip->fs_buf_len);
575
576                 ftruncate(ofd, tip->fs_size);
577         }
578 }
579
580 static void *thread_main(void *arg)
581 {
582         struct thread_information *tip = arg;
583         pid_t pid = getpid();
584         cpu_set_t cpu_mask;
585
586         CPU_ZERO(&cpu_mask);
587         CPU_SET((tip->cpu), &cpu_mask);
588
589         if (sched_setaffinity(pid, sizeof(cpu_mask), &cpu_mask) == -1) {
590                 perror("sched_setaffinity");
591                 exit_trace(1);
592         }
593
594         snprintf(tip->fn, sizeof(tip->fn), "%s/block/%s/trace%d",
595                         relay_path, tip->device->buts_name, tip->cpu);
596         tip->fd = open(tip->fn, O_RDONLY);
597         if (tip->fd < 0) {
598                 perror(tip->fn);
599                 fprintf(stderr,"Thread %d failed open of %s\n", tip->cpu,
600                         tip->fn);
601                 exit_trace(1);
602         }
603
604         while (!is_done()) {
605                 if (tip->get_subbuf(tip, buf_size))
606                         break;
607         }
608
609         tip_ftrunc_final(tip);
610         tip->exited = 1;
611         return NULL;
612 }
613
614 static int write_data_net(int fd, void *buf, unsigned int buf_len)
615 {
616         unsigned int bytes_left = buf_len;
617         int ret;
618
619         while (bytes_left) {
620                 ret = send(fd, buf, bytes_left, 0);
621                 if (ret < 0) {
622                         perror("send");
623                         return 1;
624                 }
625
626                 buf += ret;
627                 bytes_left -= ret;
628         }
629
630         return 0;
631 }
632
633 static int flush_subbuf_net(struct thread_information *tip,
634                             struct tip_subbuf *ts)
635 {
636         struct blktrace_net_hdr hdr;
637
638         hdr.magic = BLK_IO_TRACE_MAGIC;
639         strcpy(hdr.buts_name, tip->device->buts_name);
640         hdr.cpu = tip->cpu;
641         hdr.max_cpus = ncpus;
642         hdr.len = ts->len;
643
644         if (write_data_net(net_out_fd, &hdr, sizeof(hdr)))
645                 return 1;
646
647         if (write_data_net(net_out_fd, ts->buf, ts->len))
648                 return 1;
649
650         free(ts->buf);
651         free(ts);
652         return 0;
653 }
654
655 static int write_data(struct thread_information *tip, void *buf,
656                       unsigned int buf_len)
657 {
658         int ret;
659
660         if (!buf_len)
661                 return 0;
662
663         while (1) {
664                 ret = fwrite(buf, buf_len, 1, tip->ofile);
665                 if (ret == 1)
666                         break;
667
668                 if (ret < 0) {
669                         perror("write");
670                         return 1;
671                 }
672         }
673
674         if (tip->ofile_stdout)
675                 fflush(tip->ofile);
676
677         return 0;
678 }
679
680 static int flush_subbuf_file(struct thread_information *tip,
681                              struct tip_subbuf *ts)
682 {
683         unsigned int offset = 0;
684         struct blk_io_trace *t;
685         int pdu_len, events = 0;
686
687         /*
688          * surplus from last run
689          */
690         if (tip->leftover_ts) {
691                 struct tip_subbuf *prev_ts = tip->leftover_ts;
692
693                 if (prev_ts->len + ts->len > prev_ts->max_len) {
694                         prev_ts->max_len += ts->len;
695                         prev_ts->buf = realloc(prev_ts->buf, prev_ts->max_len);
696                 }
697
698                 memcpy(prev_ts->buf + prev_ts->len, ts->buf, ts->len);
699                 prev_ts->len += ts->len;
700
701                 free(ts->buf);
702                 free(ts);
703
704                 ts = prev_ts;
705                 tip->leftover_ts = NULL;
706         }
707
708         while (offset + sizeof(*t) <= ts->len) {
709                 t = ts->buf + offset;
710
711                 if (verify_trace(t)) {
712                         write_data(tip, ts->buf, offset);
713                         return -1;
714                 }
715
716                 pdu_len = t->pdu_len;
717
718                 if (offset + sizeof(*t) + pdu_len > ts->len)
719                         break;
720
721                 offset += sizeof(*t) + pdu_len;
722                 tip->events_processed++;
723                 tip->data_read += sizeof(*t) + pdu_len;
724                 events++;
725         }
726
727         if (write_data(tip, ts->buf, offset))
728                 return -1;
729
730         /*
731          * leftover bytes, save them for next time
732          */
733         if (offset != ts->len) {
734                 tip->leftover_ts = ts;
735                 ts->len -= offset;
736                 memmove(ts->buf, ts->buf + offset, ts->len);
737         } else {
738                 free(ts->buf);
739                 free(ts);
740         }
741
742         return events;
743 }
744
745 static int write_tip_events(struct thread_information *tip)
746 {
747         struct tip_subbuf *ts = subbuf_fifo_dequeue(tip);
748
749         if (ts)
750                 return tip->flush_subbuf(tip, ts);
751
752         return 0;
753 }
754
755 /*
756  * scans the tips we know and writes out the subbuffers we accumulate
757  */
758 static void get_and_write_events(void)
759 {
760         struct device_information *dip;
761         struct thread_information *tip;
762         int i, j, events, ret, tips_running;
763
764         while (!is_done()) {
765                 events = 0;
766
767                 for_each_dip(dip, i) {
768                         for_each_tip(dip, tip, j) {
769                                 ret = write_tip_events(tip);
770                                 if (ret > 0)
771                                         events += ret;
772                         }
773                 }
774
775                 if (!events)
776                         usleep(10);
777         }
778
779         /*
780          * reap stored events
781          */
782         do {
783                 events = 0;
784                 tips_running = 0;
785                 for_each_dip(dip, i) {
786                         for_each_tip(dip, tip, j) {
787                                 ret = write_tip_events(tip);
788                                 if (ret > 0)
789                                         events += ret;
790                                 tips_running += !tip->exited;
791                         }
792                 }
793                 usleep(10);
794         } while (events || tips_running);
795 }
796
797 static void wait_for_threads(void)
798 {
799         /*
800          * for piped or network output, poll and fetch data for writeout.
801          * for files, we just wait around for trace threads to exit
802          */
803         if ((output_name && !strcmp(output_name, "-")) ||
804             net_mode == Net_client)
805                 get_and_write_events();
806         else {
807                 struct device_information *dip;
808                 struct thread_information *tip;
809                 int i, j, tips_running;
810
811                 do {
812                         tips_running = 0;
813                         usleep(1000);
814
815                         for_each_dip(dip, i)
816                                 for_each_tip(dip, tip, j)
817                                         tips_running += !tip->exited;
818                 } while (tips_running);
819         }
820 }
821
822 static void fill_ofname(char *dst, char *buts_name, int cpu)
823 {
824         int len = 0;
825
826         if (output_dir)
827                 len = sprintf(dst, "%s/", output_dir);
828
829         if (output_name)
830                 sprintf(dst + len, "%s.blktrace.%d", output_name, cpu);
831         else
832                 sprintf(dst + len, "%s.blktrace.%d", buts_name, cpu);
833 }
834
835 static void fill_ops(struct thread_information *tip)
836 {
837         /*
838          * setup ops
839          */
840         if (tip->ofile_mmap && net_mode != Net_client)
841                 tip->get_subbuf = mmap_subbuf;
842         else
843                 tip->get_subbuf = get_subbuf;
844
845         if (net_mode == Net_client)
846                 tip->flush_subbuf = flush_subbuf_net;
847         else
848                 tip->flush_subbuf = flush_subbuf_file;
849
850         if (net_mode == Net_server)
851                 tip->read_data = read_data_net;
852         else
853                 tip->read_data = read_data_file;
854 }
855
856 static int start_threads(struct device_information *dip)
857 {
858         struct thread_information *tip;
859         int j, pipeline = output_name && !strcmp(output_name, "-");
860         int mode, vbuf_size;
861         char op[64];
862
863         for_each_tip(dip, tip, j) {
864                 tip->cpu = j;
865                 tip->device = dip;
866                 tip->events_processed = 0;
867                 memset(&tip->fifo, 0, sizeof(tip->fifo));
868                 tip->leftover_ts = NULL;
869
870                 if (pipeline) {
871                         tip->ofile = fdopen(STDOUT_FILENO, "w");
872                         tip->ofile_stdout = 1;
873                         tip->ofile_mmap = 0;
874                         mode = _IOLBF;
875                         vbuf_size = 512;
876                 } else {
877                         fill_ofname(op, dip->buts_name, tip->cpu);
878                         tip->ofile = fopen(op, "w+");
879                         tip->ofile_stdout = 0;
880                         tip->ofile_mmap = 1;
881                         mode = _IOFBF;
882                         vbuf_size = OFILE_BUF;
883                 }
884
885                 if (tip->ofile == NULL) {
886                         perror(op);
887                         return 1;
888                 }
889
890                 tip->ofile_buffer = malloc(vbuf_size);
891                 if (setvbuf(tip->ofile, tip->ofile_buffer, mode, vbuf_size)) {
892                         perror("setvbuf");
893                         close_thread(tip);
894                         return 1;
895                 }
896
897                 fill_ops(tip);
898
899                 if (pthread_create(&tip->thread, NULL, thread_main, tip)) {
900                         perror("pthread_create");
901                         close_thread(tip);
902                         return 1;
903                 }
904         }
905
906         return 0;
907 }
908
909 static void stop_threads(struct device_information *dip)
910 {
911         struct thread_information *tip;
912         unsigned long ret;
913         int i;
914
915         for_each_tip(dip, tip, i) {
916                 (void) pthread_join(tip->thread, (void *) &ret);
917                 close_thread(tip);
918         }
919 }
920
921 static void stop_all_threads(void)
922 {
923         struct device_information *dip;
924         int i;
925
926         for_each_dip(dip, i)
927                 stop_threads(dip);
928 }
929
930 static void stop_all_tracing(void)
931 {
932         struct device_information *dip;
933         int i;
934
935         for_each_dip(dip, i)
936                 stop_trace(dip);
937 }
938
939 static void exit_trace(int status)
940 {
941         if (!is_trace_stopped()) {
942                 trace_stopped = 1;
943                 stop_all_threads();
944                 stop_all_tracing();
945         }
946
947         exit(status);
948 }
949
950 static int resize_devices(char *path)
951 {
952         int size = (ndevs + 1) * sizeof(struct device_information);
953
954         device_information = realloc(device_information, size);
955         if (!device_information) {
956                 fprintf(stderr, "Out of memory, device %s (%d)\n", path, size);
957                 return 1;
958         }
959         device_information[ndevs].path = path;
960         ndevs++;
961         return 0;
962 }
963
964 static int open_devices(void)
965 {
966         struct device_information *dip;
967         int i;
968
969         for_each_dip(dip, i) {
970                 dip->fd = open(dip->path, O_RDONLY | O_NONBLOCK);
971                 if (dip->fd < 0) {
972                         perror(dip->path);
973                         return 1;
974                 }
975         }
976
977         return 0;
978 }
979
980 static int start_devices(void)
981 {
982         struct device_information *dip;
983         int i, j, size;
984
985         size = ncpus * sizeof(struct thread_information);
986         thread_information = malloc(size * ndevs);
987         if (!thread_information) {
988                 fprintf(stderr, "Out of memory, threads (%d)\n", size * ndevs);
989                 return 1;
990         }
991
992         for_each_dip(dip, i) {
993                 if (start_trace(dip)) {
994                         close(dip->fd);
995                         fprintf(stderr, "Failed to start trace on %s\n",
996                                 dip->path);
997                         break;
998                 }
999         }
1000
1001         if (i != ndevs) {
1002                 __for_each_dip(dip, j, i)
1003                         stop_trace(dip);
1004
1005                 return 1;
1006         }
1007
1008         for_each_dip(dip, i) {
1009                 dip->threads = thread_information + (i * ncpus);
1010                 if (start_threads(dip)) {
1011                         fprintf(stderr, "Failed to start worker threads\n");
1012                         break;
1013                 }
1014         }
1015
1016         if (i != ndevs) {
1017                 __for_each_dip(dip, j, i)
1018                         stop_threads(dip);
1019                 for_each_dip(dip, i)
1020                         stop_trace(dip);
1021
1022                 return 1;
1023         }
1024
1025         return 0;
1026 }
1027
1028 static void show_stats(void)
1029 {
1030         struct device_information *dip;
1031         struct thread_information *tip;
1032         unsigned long long events_processed, data_read;
1033         unsigned long total_drops;
1034         int i, j, no_stdout = 0;
1035
1036         if (is_stat_shown())
1037                 return;
1038
1039         if (output_name && !strcmp(output_name, "-"))
1040                 no_stdout = 1;
1041
1042         stat_shown = 1;
1043
1044         total_drops = 0;
1045         for_each_dip(dip, i) {
1046                 if (!no_stdout)
1047                         printf("Device: %s\n", dip->path);
1048                 events_processed = 0;
1049                 data_read = 0;
1050                 for_each_tip(dip, tip, j) {
1051                         if (!no_stdout)
1052                                 printf("  CPU%3d: %20lu events, %8llu KiB data\n",
1053                                         tip->cpu, tip->events_processed,
1054                                         tip->data_read >> 10);
1055                         events_processed += tip->events_processed;
1056                         data_read += tip->data_read;
1057                 }
1058                 total_drops += dip->drop_count;
1059                 if (!no_stdout)
1060                         printf("  Total:  %20llu events (dropped %lu), %8llu KiB data\n",
1061                                         events_processed, dip->drop_count,
1062                                         data_read >> 10);
1063         }
1064
1065         if (total_drops)
1066                 fprintf(stderr, "You have dropped events, consider using a larger buffer size (-b)\n");
1067 }
1068
1069 static struct device_information *net_get_dip(char *buts_name)
1070 {
1071         struct device_information *dip;
1072         int i;
1073
1074         for (i = 0; i < ndevs; i++) {
1075                 dip = &device_information[i];
1076
1077                 if (!strcmp(dip->buts_name, buts_name))
1078                         return dip;
1079         }
1080
1081         device_information = realloc(device_information, (ndevs + 1) * sizeof(*dip));
1082         dip = &device_information[ndevs];
1083         strcpy(dip->buts_name, buts_name);
1084         strcpy(dip->path, buts_name);
1085         ndevs++;
1086         dip->threads = malloc(ncpus * sizeof(struct thread_information));
1087         memset(dip->threads, 0, ncpus * sizeof(struct thread_information));
1088
1089         /*
1090          * open all files
1091          */
1092         for (i = 0; i < ncpus; i++) {
1093                 struct thread_information *tip = &dip->threads[i];
1094                 char op[64];
1095
1096                 tip->cpu = i;
1097                 tip->ofile_stdout = 0;
1098                 tip->ofile_mmap = 1;
1099                 tip->device = dip;
1100
1101                 fill_ofname(op, dip->buts_name, tip->cpu);
1102
1103                 tip->ofile = fopen(op, "w+");
1104                 if (!tip->ofile) {
1105                         perror("fopen");
1106                         return NULL;
1107                 }
1108         }
1109
1110         return dip;
1111 }
1112
1113 static struct thread_information *net_get_tip(struct blktrace_net_hdr *bnh)
1114 {
1115         struct device_information *dip;
1116
1117         ncpus = bnh->max_cpus;
1118         dip = net_get_dip(bnh->buts_name);
1119         return &dip->threads[bnh->cpu];
1120 }
1121
1122 static int net_get_header(struct blktrace_net_hdr *bnh)
1123 {
1124         int fl = fcntl(net_in_fd, F_GETFL);
1125         int bytes_left, ret;
1126         void *p = bnh;
1127
1128         fcntl(net_in_fd, F_SETFL, fl | O_NONBLOCK);
1129         bytes_left = sizeof(*bnh);
1130         while (bytes_left && !is_done()) {
1131                 ret = recv(net_in_fd, p, bytes_left, MSG_WAITALL);
1132                 if (ret < 0) {
1133                         if (errno != EAGAIN) {
1134                                 perror("recv header");
1135                                 return 1;
1136                         }
1137                         usleep(100);
1138                         continue;
1139                 } else if (!ret) {
1140                         usleep(100);
1141                         continue;
1142                 } else {
1143                         p += ret;
1144                         bytes_left -= ret;
1145                 }
1146         }
1147         fcntl(net_in_fd, F_SETFL, fl & ~O_NONBLOCK);
1148         return 0;
1149 }
1150
1151 static int net_server_loop(void)
1152 {
1153         struct thread_information *tip;
1154         struct blktrace_net_hdr bnh;
1155
1156         if (net_get_header(&bnh))
1157                 return 1;
1158
1159         if (data_is_native == -1 && check_data_endianness(bnh.magic)) {
1160                 fprintf(stderr, "server: received data is bad\n");
1161                 return 1;
1162         }
1163
1164         if (!data_is_native) {
1165                 bnh.cpu = be32_to_cpu(bnh.cpu);
1166                 bnh.len = be32_to_cpu(bnh.len);
1167         }
1168
1169         tip = net_get_tip(&bnh);
1170         if (!tip)
1171                 return 1;
1172
1173         if (mmap_subbuf(tip, bnh.len))
1174                 return 1;
1175
1176         return 0;
1177 }
1178
1179 /*
1180  * Start here when we are in server mode - just fetch data from the network
1181  * and dump to files
1182  */
1183 static int net_server(void)
1184 {
1185         struct sockaddr_in addr;
1186         socklen_t socklen;
1187         int fd, opt, i, j;
1188
1189         fd = socket(AF_INET, SOCK_STREAM, 0);
1190         if (fd < 0) {
1191                 perror("server: socket");
1192                 return 1;
1193         }
1194
1195         opt = 1;
1196         if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) {
1197                 perror("setsockopt");
1198                 return 1;
1199         }
1200
1201         memset(&addr, 0, sizeof(addr));
1202         addr.sin_family = AF_INET;
1203         addr.sin_addr.s_addr = htonl(INADDR_ANY);
1204         addr.sin_port = htons(net_port);
1205
1206         if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
1207                 perror("bind");
1208                 return 1;
1209         }
1210
1211         if (listen(fd, 1) < 0) {
1212                 perror("listen");
1213                 return 1;
1214         }
1215
1216         printf("blktrace: waiting for incoming connection...\n");
1217
1218         socklen = sizeof(addr);
1219         net_in_fd = accept(fd, (struct sockaddr *) &addr, &socklen);
1220         if (net_in_fd < 0) {
1221                 perror("accept");
1222                 return 1;
1223         }
1224
1225         signal(SIGINT, handle_sigint);
1226         signal(SIGHUP, handle_sigint);
1227         signal(SIGTERM, handle_sigint);
1228         signal(SIGALRM, handle_sigint);
1229
1230         printf("blktrace: connected!\n");
1231
1232         while (!is_done()) {
1233                 if (net_server_loop())
1234                         break;
1235         }
1236
1237         for (i = 0; i < ndevs; i++) {
1238                 struct device_information *dip = &device_information[i];
1239
1240                 for (j = 0; j < ncpus; j++)
1241                         tip_ftrunc_final(&dip->threads[j]);
1242         }
1243
1244         show_stats();
1245         return 0;
1246 }
1247
1248 /*
1249  * Setup outgoing network connection where we will transmit data
1250  */
1251 static int net_setup_client(void)
1252 {
1253         struct sockaddr_in addr;
1254         int fd;
1255
1256         fd = socket(AF_INET, SOCK_STREAM, 0);
1257         if (fd < 0) {
1258                 perror("client: socket");
1259                 return 1;
1260         }
1261
1262         memset(&addr, 0, sizeof(addr));
1263         addr.sin_family = AF_INET;
1264         addr.sin_port = htons(net_port);
1265
1266         if (inet_aton(hostname, &addr.sin_addr) != 1) {
1267                 struct hostent *hent = gethostbyname(hostname);
1268                 if (!hent) {
1269                         perror("gethostbyname");
1270                         return 1;
1271                 }
1272
1273                 memcpy(&addr.sin_addr, hent->h_addr, 4);
1274                 strcpy(hostname, hent->h_name);
1275         }
1276
1277         printf("blktrace: connecting to %s\n", hostname);
1278
1279         if (connect(fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) {
1280                 perror("client: connect");
1281                 return 1;
1282         }
1283
1284         printf("blktrace: connected!\n");
1285         net_out_fd = fd;
1286         return 0;
1287 }
1288
1289 static char usage_str[] = \
1290         "-d <dev> [ -r relay path ] [ -o <output> ] [-k ] [ -w time ]\n" \
1291         "[ -a action ] [ -A action mask ] [ -v ]\n\n" \
1292         "\t-d Use specified device. May also be given last after options\n" \
1293         "\t-r Path to mounted relayfs, defaults to /relay\n" \
1294         "\t-o File(s) to send output to\n" \
1295         "\t-D Directory to prepend to output file names\n" \
1296         "\t-k Kill a running trace\n" \
1297         "\t-w Stop after defined time, in seconds\n" \
1298         "\t-a Only trace specified actions. See documentation\n" \
1299         "\t-A Give trace mask as a single value. See documentation\n" \
1300         "\t-b Sub buffer size in KiB\n" \
1301         "\t-n Number of sub buffers\n" \
1302         "\t-v Print program version info\n\n";
1303
1304 static void show_usage(char *program)
1305 {
1306         fprintf(stderr, "Usage: %s %s %s",program, blktrace_version, usage_str);
1307 }
1308
1309 int main(int argc, char *argv[])
1310 {
1311         static char default_relay_path[] = "/relay";
1312         struct statfs st;
1313         int i, c;
1314         int stop_watch = 0;
1315         int act_mask_tmp = 0;
1316
1317         while ((c = getopt_long(argc, argv, S_OPTS, l_opts, NULL)) >= 0) {
1318                 switch (c) {
1319                 case 'a':
1320                         i = find_mask_map(optarg);
1321                         if (i < 0) {
1322                                 fprintf(stderr,"Invalid action mask %s\n",
1323                                         optarg);
1324                                 return 1;
1325                         }
1326                         act_mask_tmp |= i;
1327                         break;
1328
1329                 case 'A':
1330                         if ((sscanf(optarg, "%x", &i) != 1) || 
1331                                                         !valid_act_opt(i)) {
1332                                 fprintf(stderr,
1333                                         "Invalid set action mask %s/0x%x\n",
1334                                         optarg, i);
1335                                 return 1;
1336                         }
1337                         act_mask_tmp = i;
1338                         break;
1339
1340                 case 'd':
1341                         if (resize_devices(optarg) != 0)
1342                                 return 1;
1343                         break;
1344
1345                 case 'r':
1346                         relay_path = optarg;
1347                         break;
1348
1349                 case 'o':
1350                         output_name = optarg;
1351                         break;
1352                 case 'k':
1353                         kill_running_trace = 1;
1354                         break;
1355                 case 'w':
1356                         stop_watch = atoi(optarg);
1357                         if (stop_watch <= 0) {
1358                                 fprintf(stderr,
1359                                         "Invalid stopwatch value (%d secs)\n",
1360                                         stop_watch);
1361                                 return 1;
1362                         }
1363                         break;
1364                 case 'V':
1365                         printf("%s version %s\n", argv[0], blktrace_version);
1366                         return 0;
1367                 case 'b':
1368                         buf_size = strtoul(optarg, NULL, 10);
1369                         if (buf_size <= 0 || buf_size > 16*1024) {
1370                                 fprintf(stderr,
1371                                         "Invalid buffer size (%lu)\n",buf_size);
1372                                 return 1;
1373                         }
1374                         buf_size <<= 10;
1375                         break;
1376                 case 'n':
1377                         buf_nr = strtoul(optarg, NULL, 10);
1378                         if (buf_nr <= 0) {
1379                                 fprintf(stderr,
1380                                         "Invalid buffer nr (%lu)\n", buf_nr);
1381                                 return 1;
1382                         }
1383                         break;
1384                 case 'D':
1385                         output_dir = optarg;
1386                         break;
1387                 case 'h':
1388                         net_mode = Net_client;
1389                         strcpy(hostname, optarg);
1390                         break;
1391                 case 'l':
1392                         net_mode = Net_server;
1393                         break;
1394                 case 'p':
1395                         net_port = atoi(optarg);
1396                         break;
1397                 default:
1398                         show_usage(argv[0]);
1399                         return 1;
1400                 }
1401         }
1402
1403         setlocale(LC_NUMERIC, "en_US");
1404
1405         page_size = getpagesize();
1406
1407         if (net_mode == Net_server)
1408                 return net_server();
1409
1410         while (optind < argc) {
1411                 if (resize_devices(argv[optind++]) != 0)
1412                         return 1;
1413         }
1414
1415         if (ndevs == 0) {
1416                 show_usage(argv[0]);
1417                 return 1;
1418         }
1419
1420         if (!relay_path)
1421                 relay_path = default_relay_path;
1422
1423         if (act_mask_tmp != 0)
1424                 act_mask = act_mask_tmp;
1425
1426         if (statfs(relay_path, &st) < 0) {
1427                 perror("statfs");
1428                 fprintf(stderr,"%s does not appear to be a valid path\n",
1429                         relay_path);
1430                 return 1;
1431         } else if (st.f_type != (long) RELAYFS_TYPE) {
1432                 fprintf(stderr,"%s does not appear to be a relay filesystem\n",
1433                         relay_path);
1434                 return 1;
1435         }
1436
1437         if (open_devices() != 0)
1438                 return 1;
1439
1440         if (kill_running_trace) {
1441                 stop_all_traces();
1442                 return 0;
1443         }
1444
1445         ncpus = sysconf(_SC_NPROCESSORS_ONLN);
1446         if (ncpus < 0) {
1447                 fprintf(stderr, "sysconf(_SC_NPROCESSORS_ONLN) failed\n");
1448                 return 1;
1449         }
1450
1451         signal(SIGINT, handle_sigint);
1452         signal(SIGHUP, handle_sigint);
1453         signal(SIGTERM, handle_sigint);
1454         signal(SIGALRM, handle_sigint);
1455
1456         if (net_mode == Net_client && net_setup_client())
1457                 return 1;
1458
1459         if (start_devices() != 0)
1460                 return 1;
1461
1462         atexit(stop_all_tracing);
1463
1464         if (stop_watch)
1465                 alarm(stop_watch);
1466
1467         wait_for_threads();
1468
1469         if (!is_trace_stopped()) {
1470                 trace_stopped = 1;
1471                 stop_all_threads();
1472                 stop_all_traces();
1473         }
1474
1475         show_stats();
1476
1477         return 0;
1478 }
1479