[PATCH] blktrace: remember to initialize mutex
[blktrace.git] / blktrace.c
index 2b358743341445446d81574ead5782821aec3f96..992b4eb9af988005aeb6e1b41f9468b0ca42f296 100644 (file)
@@ -39,6 +39,7 @@
 #include <assert.h>
 
 #include "blktrace.h"
+#include "list.h"
 
 static char blktrace_version[] = "0.99";
 
@@ -129,6 +130,13 @@ static struct option l_opts[] = {
        }
 };
 
+struct tip_subbuf {
+       struct list_head list;
+       void *buf;
+       unsigned int len;
+       unsigned int max_len;
+};
+
 struct thread_information {
        int cpu;
        pthread_t thread;
@@ -140,12 +148,18 @@ struct thread_information {
        unsigned long fd_max_size;
        char fn[MAXPATHLEN + 64];
 
-       pthread_mutex_t *fd_lock;
        FILE *ofile;
        char *ofile_buffer;
+       int ofile_stdout;
 
        unsigned long events_processed;
        struct device_information *device;
+
+       int exited;
+
+       pthread_mutex_t lock;
+       struct list_head subbuf_list;
+       struct tip_subbuf *leftover_ts;
 };
 
 struct device_information {
@@ -180,8 +194,6 @@ static volatile int trace_stopped;
 #define is_stat_shown()        (*(volatile int *)(&stat_shown))
 static volatile int stat_shown;
 
-static pthread_mutex_t stdout_mutex = PTHREAD_MUTEX_INITIALIZER;
-
 static void exit_trace(int status);
 
 #define dip_tracing(dip)       (*(volatile int *)(&(dip)->trace_started))
@@ -191,8 +203,8 @@ static void exit_trace(int status);
        for (__i = 0, __d = device_information; __i < __e; __i++, __d++)
 
 #define for_each_dip(__d, __i) __for_each_dip(__d, __i, ndevs)
-#define for_each_tip(__d, __t, __i)    \
-       for (__i = 0, __t = (__d)->threads; __i < ncpus; __i++, __t++)
+#define for_each_tip(__d, __t, __j)    \
+       for (__j = 0, __t = (__d)->threads; __j < ncpus; __j++, __t++)
 
 static int get_dropped_count(const char *buts_name)
 {
@@ -272,136 +284,131 @@ static void wait_for_data(struct thread_information *tip)
 {
        struct pollfd pfd = { .fd = tip->fd, .events = POLLIN };
 
-       poll(&pfd, 1, 10);
+       do {
+               poll(&pfd, 1, 100);
+               if (pfd.revents & POLLIN)
+                       break;
+               if (tip->ofile_stdout)
+                       break;
+       } while (!is_done());
 }
 
-static int __read_data(struct thread_information *tip, void *buf, int len,
-                      int block)
+static int read_data(struct thread_information *tip, void *buf, int len)
 {
        int ret = 0;
 
-       while (!is_done()) {
-               ret = read(tip->fd, buf, len);
-               if (ret > 0)
-                       break;
-               else if (!ret) {
-                       if (!block)
-                               break;
+       do {
+               wait_for_data(tip);
 
-                       wait_for_data(tip);
-               } else {
+               ret = read(tip->fd, buf, len);
+               if (!ret)
+                       continue;
+               else if (ret > 0)
+                       return ret;
+               else {
                        if (errno != EAGAIN) {
                                perror(tip->fn);
                                fprintf(stderr,"Thread %d failed read of %s\n",
                                        tip->cpu, tip->fn);
                                break;
                        }
-                       if (!block) {
-                               ret = 0;
-                               break;
-                       }
-
-                       wait_for_data(tip);
+                       continue;
                }
-       }
+       } while (!is_done());
 
        return ret;
 }
 
-#define can_grow_ring(tip)     ((tip)->fd_max_size < RING_MAX_NR * buf_size * buf_nr)
-
-static int resize_ringbuffer(struct thread_information *tip)
+static inline void tip_fd_unlock(struct thread_information *tip)
 {
-       if (!can_grow_ring(tip))
-               return 1;
-
-       tip->fd_buf = realloc(tip->fd_buf, 2 * tip->fd_max_size);
-
-       /*
-        * if the ring currently wraps, copy range over
-        */
-       if (tip->fd_off + tip->fd_size > tip->fd_max_size) {
-               unsigned long wrap_size = tip->fd_size - (tip->fd_max_size - tip->fd_off);
-               memmove(tip->fd_buf + tip->fd_max_size, tip->fd_buf, wrap_size);
-       }
+       pthread_mutex_unlock(&tip->lock);
+}
 
-       tip->fd_max_size <<= 1;
-       return 0;
+static inline void tip_fd_lock(struct thread_information *tip)
+{
+       pthread_mutex_lock(&tip->lock);
 }
 
-static int __refill_ringbuffer(struct thread_information *tip, int len,
-                              int block)
+static int get_subbuf(struct thread_information *tip)
 {
-       unsigned long off;
+       struct tip_subbuf *ts;
        int ret;
 
-       off = (tip->fd_size + tip->fd_off) & (tip->fd_max_size - 1);
-       if (off + len > tip->fd_max_size)
-               len = tip->fd_max_size - off;
-
-       assert(len > 0);
+       ts = malloc(sizeof(*ts));
+       ts->buf = malloc(buf_size);
+       ts->max_len = buf_size;
 
-       ret = __read_data(tip, tip->fd_buf + off, len, block);
-       if (ret < 0)
-               return -1;
+       ret = read_data(tip, ts->buf, ts->max_len);
+       if (ret > 0) {
+               ts->len = ret;
+               tip_fd_lock(tip);
+               list_add_tail(&ts->list, &tip->subbuf_list);
+               tip_fd_unlock(tip);
+               return 0;
+       }
 
-       tip->fd_size += ret;
-       return ret;
+       free(ts->buf);
+       free(ts);
+       return -1;
 }
 
-/*
- * keep filling ring until we get a short read
- */
-static void refill_ringbuffer(struct thread_information *tip, int block)
+static void close_thread(struct thread_information *tip)
 {
-       int len = buf_size;
-       int ret;
-
-       do {
-               if (len + tip->fd_size > tip->fd_max_size)
-                       resize_ringbuffer(tip);
+       if (tip->fd != -1)
+               close(tip->fd);
+       if (tip->ofile)
+               fclose(tip->ofile);
+       if (tip->ofile_buffer)
+               free(tip->ofile_buffer);
+       if (tip->fd_buf)
+               free(tip->fd_buf);
 
-               ret = __refill_ringbuffer(tip, len, block);
-       } while ((ret == len) && !is_done());
+       tip->fd = -1;
+       tip->ofile = NULL;
+       tip->ofile_buffer = NULL;
+       tip->fd_buf = NULL;
 }
 
-static int read_data(struct thread_information *tip, void *buf,
-                    unsigned int len)
+static void *thread_main(void *arg)
 {
-       unsigned int start_size, end_size;
+       struct thread_information *tip = arg;
+       pid_t pid = getpid();
+       cpu_set_t cpu_mask;
 
-       refill_ringbuffer(tip, len > tip->fd_size);
+       CPU_ZERO(&cpu_mask);
+       CPU_SET((tip->cpu), &cpu_mask);
 
-       if (len > tip->fd_size)
-               return -1;
+       if (sched_setaffinity(pid, sizeof(cpu_mask), &cpu_mask) == -1) {
+               perror("sched_setaffinity");
+               exit_trace(1);
+       }
 
-       /*
-        * see if we wrap the ring
-        */
-       start_size = len;
-       end_size = 0;
-       if (len > (tip->fd_max_size - tip->fd_off)) {
-               start_size = tip->fd_max_size - tip->fd_off;
-               end_size = len - start_size;
+       snprintf(tip->fn, sizeof(tip->fn), "%s/block/%s/trace%d",
+                       relay_path, tip->device->buts_name, tip->cpu);
+       tip->fd = open(tip->fn, O_RDONLY);
+       if (tip->fd < 0) {
+               perror(tip->fn);
+               fprintf(stderr,"Thread %d failed open of %s\n", tip->cpu,
+                       tip->fn);
+               exit_trace(1);
        }
 
-       memcpy(buf, tip->fd_buf + tip->fd_off, start_size);
-       if (end_size)
-               memcpy(buf + start_size, tip->fd_buf, end_size);
+       for (;;) {
+               if (get_subbuf(tip))
+                       break;
+       }
 
-       tip->fd_off = (tip->fd_off + len) & (tip->fd_max_size - 1);
-       tip->fd_size -= len;
-       return 0;
+       tip->exited = 1;
+       return NULL;
 }
 
-static int write_data(FILE *file, void *buf, unsigned int buf_len)
+static int write_data(struct thread_information *tip,
+                     void *buf, unsigned int buf_len)
 {
-       int ret, bytes_left;
-       char *p = buf;
+       int ret;
 
-       bytes_left = buf_len;
-       while (bytes_left > 0) {
-               ret = fwrite(p, bytes_left, 1, file);
+       while (1) {
+               ret = fwrite(buf, buf_len, 1, tip->ofile);
                if (ret == 1)
                        break;
 
@@ -411,211 +418,132 @@ static int write_data(FILE *file, void *buf, unsigned int buf_len)
                }
        }
 
-       return 0;
-}
-
-static void *extract_data(struct thread_information *tip, int nb)
-{
-       unsigned char *buf;
+       if (tip->ofile_stdout)
+               fflush(tip->ofile);
 
-       buf = malloc(nb);
-       if (!read_data(tip, buf, nb))
-               return buf;
-
-       free(buf);
-       return NULL;
+       return 0;
 }
 
-/*
- * trace may start inside 'bit' or may need to be gotten further on
- */
-static int get_event_slow(struct thread_information *tip,
-                         struct blk_io_trace *bit)
+static int flush_subbuf(struct thread_information *tip, struct tip_subbuf *ts)
 {
-       const int inc = sizeof(__u32);
-       struct blk_io_trace foo;
-       unsigned int offset;
-       void *p;
+       unsigned int offset = 0;
+       struct blk_io_trace *t;
+       int pdu_len, events = 0;
 
        /*
-        * check if trace is inside
+        * surplus from last run
         */
-       offset = 0;
-       p = bit;
-       while (offset < sizeof(*bit)) {
-               p += inc;
-               offset += inc;
-
-               memcpy(&foo, p, inc);
+       if (tip->leftover_ts) {
+               struct tip_subbuf *prev_ts = tip->leftover_ts;
 
-               if (CHECK_MAGIC(&foo))
-                       break;
-       }
+               if (prev_ts->len + ts->len > prev_ts->max_len) {
+                       prev_ts->max_len += ts->len;
+                       prev_ts->buf = realloc(prev_ts->buf, prev_ts->max_len);
+               }
 
-       /*
-        * part trace found inside, read the rest
-        */
-       if (offset < sizeof(*bit)) {
-               int good_bytes = sizeof(*bit) - offset;
+               memcpy(prev_ts->buf + prev_ts->len, ts->buf, ts->len);
+               prev_ts->len += ts->len;
 
-               memmove(bit, p, good_bytes);
-               p = (void *) bit + good_bytes;
+               free(ts->buf);
+               free(ts);
 
-               return read_data(tip, p, offset);
+               ts = prev_ts;
+               tip->leftover_ts = NULL;
        }
 
-       /*
-        * nothing found, keep looking for start of trace
-        */
-       do {
-               if (read_data(tip, bit, sizeof(bit->magic)))
+       while (offset + sizeof(*t) <= ts->len) {
+               t = ts->buf + offset;
+
+               if (verify_trace(t))
                        return -1;
-       } while (!CHECK_MAGIC(bit));
 
-       /*
-        * now get the rest of it
-        */
-       p = &bit->sequence;
-       if (read_data(tip, p, sizeof(*bit) - inc))
-               return -1;
+               pdu_len = t->pdu_len;
 
-       return 0;
-}
+               if (offset + sizeof(*t) + pdu_len > ts->len)
+                       break;
 
-/*
- * Sometimes relayfs screws us a little, if an event crosses a sub buffer
- * boundary. So keep looking forward in the trace data until an event
- * is found
- */
-static int get_event(struct thread_information *tip, struct blk_io_trace *bit)
-{
-       /*
-        * optimize for the common fast case, a full trace read that
-        * succeeds
-        */
-       if (read_data(tip, bit, sizeof(*bit)))
-               return -1;
+               trace_to_be(t);
 
-       if (CHECK_MAGIC(bit))
-               return 0;
+               if (write_data(tip, t, sizeof(*t) + pdu_len))
+                       return -1;
+
+               offset += sizeof(*t) + pdu_len;
+               tip->events_processed++;
+               events++;
+       }
 
        /*
-        * ok that didn't work, the event may start somewhere inside the
-        * trace itself
+        * leftover bytes, save them for next time
         */
-       return get_event_slow(tip, bit);
-}
+       if (offset != ts->len) {
+               tip->leftover_ts = ts;
+               ts->len -= offset;
+               memmove(ts->buf, ts->buf + offset, ts->len);
+       } else {
+               free(ts->buf);
+               free(ts);
+       }
 
-static inline void tip_fd_unlock(struct thread_information *tip)
-{
-       if (tip->fd_lock)
-               pthread_mutex_unlock(tip->fd_lock);
+       return events;
 }
 
-static inline void tip_fd_lock(struct thread_information *tip)
+static int write_tip_events(struct thread_information *tip)
 {
-       if (tip->fd_lock)
-               pthread_mutex_lock(tip->fd_lock);
-}
+       struct tip_subbuf *ts = NULL;
 
-static void close_thread(struct thread_information *tip)
-{
-       if (tip->fd != -1)
-               close(tip->fd);
-       if (tip->ofile)
-               fclose(tip->ofile);
-       if (tip->ofile_buffer)
-               free(tip->ofile_buffer);
-       if (tip->fd_buf)
-               free(tip->fd_buf);
+       tip_fd_lock(tip);
+       if (!list_empty(&tip->subbuf_list)) {
+               ts = list_entry(tip->subbuf_list.next, struct tip_subbuf, list);
+               list_del(&ts->list);
+       }
+       tip_fd_unlock(tip);
 
-       tip->fd = -1;
-       tip->ofile = NULL;
-       tip->ofile_buffer = NULL;
-       tip->fd_buf = NULL;
+       if (ts)
+               return flush_subbuf(tip, ts);
+
+       return 0;
 }
 
-static void *extract(void *arg)
+/*
+ * scans the tips we know and writes out the subbuffers we accumulate
+ */
+static void get_and_write_events(void)
 {
-       struct thread_information *tip = arg;
-       int pdu_len;
-       char *pdu_data;
-       struct blk_io_trace t;
-       pid_t pid = getpid();
-       cpu_set_t cpu_mask;
+       struct device_information *dip;
+       struct thread_information *tip;
+       int i, j, events, ret, tips_running;
 
-       CPU_ZERO(&cpu_mask);
-       CPU_SET((tip->cpu), &cpu_mask);
+       while (!is_done()) {
+               events = 0;
 
-       if (sched_setaffinity(pid, sizeof(cpu_mask), &cpu_mask) == -1) {
-               perror("sched_setaffinity");
-               exit_trace(1);
-       }
+               for_each_dip(dip, i) {
+                       for_each_tip(dip, tip, j) {
+                               ret = write_tip_events(tip);
+                               if (ret > 0)
+                                       events += ret;
+                       }
+               }
 
-       snprintf(tip->fn, sizeof(tip->fn), "%s/block/%s/trace%d",
-                       relay_path, tip->device->buts_name, tip->cpu);
-       tip->fd = open(tip->fn, O_RDONLY | O_NONBLOCK);
-       if (tip->fd < 0) {
-               perror(tip->fn);
-               fprintf(stderr,"Thread %d failed open of %s\n", tip->cpu,
-                       tip->fn);
-               exit_trace(1);
+               if (!events)
+                       usleep(10);
        }
 
        /*
-        * start with a ringbuffer that is twice the size of the kernel side
+        * reap stored events
         */
-       tip->fd_max_size = buf_size * buf_nr * RING_INIT_NR;
-       tip->fd_buf = malloc(tip->fd_max_size);
-       tip->fd_off = 0;
-       tip->fd_size = 0;
-
-       pdu_data = NULL;
-       while (1) {
-               if (get_event(tip, &t))
-                       break;
-
-               if (verify_trace(&t))
-                       break;
-
-               pdu_len = t.pdu_len;
-
-               trace_to_be(&t);
-
-               if (pdu_len) {
-                       pdu_data = extract_data(tip, pdu_len);
-                       if (!pdu_data)
-                               break;
-               }
-
-               /*
-                * now we have both trace and payload, get a lock on the
-                * output descriptor and send it off
-                */
-               tip_fd_lock(tip);
-
-               if (write_data(tip->ofile, &t, sizeof(t))) {
-                       tip_fd_unlock(tip);
-                       break;
-               }
-
-               if (pdu_data && write_data(tip->ofile, pdu_data, pdu_len)) {
-                       tip_fd_unlock(tip);
-                       break;
-               }
-
-               tip_fd_unlock(tip);
-
-               if (pdu_data) {
-                       free(pdu_data);
-                       pdu_data = NULL;
+       do {
+               events = 0;
+               tips_running = 0;
+               for_each_dip(dip, i) {
+                       for_each_tip(dip, tip, j) {
+                               ret = write_tip_events(tip);
+                               if (ret > 0)
+                                       events += ret;
+                               tips_running += !tip->exited;
+                       }
                }
-
-               tip->events_processed++;
-       }
-
-       close_thread(tip);
-       return NULL;
+               usleep(10);
+       } while (events || tips_running);
 }
 
 static int start_threads(struct device_information *dip)
@@ -623,19 +551,21 @@ static int start_threads(struct device_information *dip)
        struct thread_information *tip;
        char op[64];
        int j, pipeline = output_name && !strcmp(output_name, "-");
-       int len, mode;
+       int len, mode, vbuf_size;
 
        for_each_tip(dip, tip, j) {
                tip->cpu = j;
                tip->device = dip;
-               tip->fd_lock = NULL;
                tip->events_processed = 0;
+               pthread_mutex_init(&tip->lock, NULL);
+               INIT_LIST_HEAD(&tip->subbuf_list);
+               tip->leftover_ts = NULL;
 
                if (pipeline) {
                        tip->ofile = fdopen(STDOUT_FILENO, "w");
-                       tip->fd_lock = &stdout_mutex;
+                       tip->ofile_stdout = 1;
                        mode = _IOLBF;
-                       buf_size = 512;
+                       vbuf_size = 512;
                } else {
                        len = 0;
 
@@ -650,8 +580,9 @@ static int start_threads(struct device_information *dip)
                                        dip->buts_name, tip->cpu);
                        }
                        tip->ofile = fopen(op, "w");
+                       tip->ofile_stdout = 0;
                        mode = _IOFBF;
-                       buf_size = OFILE_BUF;
+                       vbuf_size = OFILE_BUF;
                }
 
                if (tip->ofile == NULL) {
@@ -659,14 +590,14 @@ static int start_threads(struct device_information *dip)
                        return 1;
                }
 
-               tip->ofile_buffer = malloc(buf_size);
-               if (setvbuf(tip->ofile, tip->ofile_buffer, mode, buf_size)) {
+               tip->ofile_buffer = malloc(vbuf_size);
+               if (setvbuf(tip->ofile, tip->ofile_buffer, mode, vbuf_size)) {
                        perror("setvbuf");
                        close_thread(tip);
                        return 1;
                }
 
-               if (pthread_create(&tip->thread, NULL, extract, tip)) {
+               if (pthread_create(&tip->thread, NULL, thread_main, tip)) {
                        perror("pthread_create");
                        close_thread(tip);
                        return 1;
@@ -682,8 +613,10 @@ static void stop_threads(struct device_information *dip)
        unsigned long ret;
        int i;
 
-       for_each_tip(dip, tip, i)
+       for_each_tip(dip, tip, i) {
                (void) pthread_join(tip->thread, (void *) &ret);
+               close_thread(tip);
+       }
 }
 
 static void stop_all_threads(void)
@@ -795,20 +728,20 @@ static int start_devices(void)
 
 static void show_stats(void)
 {
-       int i, j, no_stdout = 0;
        struct device_information *dip;
        struct thread_information *tip;
        unsigned long long events_processed;
        unsigned long total_drops;
+       int i, j, no_stdout = 0;
 
        if (is_stat_shown())
                return;
 
-       stat_shown = 1;
-
        if (output_name && !strcmp(output_name, "-"))
                no_stdout = 1;
 
+       stat_shown = 1;
+
        total_drops = 0;
        for_each_dip(dip, i) {
                if (!no_stdout)
@@ -852,13 +785,6 @@ static void show_usage(char *program)
 static void handle_sigint(__attribute__((__unused__)) int sig)
 {
        done = 1;
-       if (!is_trace_stopped()) {
-               trace_stopped = 1;
-               stop_all_threads();
-               stop_all_traces();
-       }
-
-       show_stats();
 }
 
 int main(int argc, char *argv[])
@@ -1001,8 +927,7 @@ int main(int argc, char *argv[])
        if (stop_watch)
                alarm(stop_watch);
 
-       while (!is_done())
-               sleep(1);
+       get_and_write_events();
 
        if (!is_trace_stopped()) {
                trace_stopped = 1;