From: Jens Axboe Date: Fri, 27 Jan 2006 13:39:35 +0000 (+0100) Subject: [PATCH] blktrace: completely rewrite the event pull/pushing loops X-Git-Tag: blktrace-0.99.1~106 X-Git-Url: https://git.kernel.dk/?a=commitdiff_plain;h=9db17354639022d75377c81811242d0cc10d87f9;p=blktrace.git [PATCH] blktrace: completely rewrite the event pull/pushing loops --- diff --git a/blktrace.c b/blktrace.c index f9d87eb..3185684 100644 --- a/blktrace.c +++ b/blktrace.c @@ -39,6 +39,7 @@ #include #include "blktrace.h" +#include "list.h" static char blktrace_version[] = "0.99"; @@ -129,6 +130,13 @@ static struct option l_opts[] = { } }; +struct tip_subbuf { + struct list_head list; + void *buf; + int len; + int max_len; +}; + struct thread_information { int cpu; pthread_t thread; @@ -140,13 +148,20 @@ struct thread_information { unsigned long fd_max_size; char fn[MAXPATHLEN + 64]; - pthread_mutex_t *fd_lock; FILE *ofile; char *ofile_buffer; - int ofile_flush; + int ofile_stdout; unsigned long events_processed; struct device_information *device; + + int exited; + + pthread_mutex_t lock; + struct list_head subbuf_list; + void *leftover_ts; + int leftover_ts_len; + int leftover_ts_max; }; struct device_information { @@ -181,8 +196,6 @@ static volatile int trace_stopped; #define is_stat_shown() (*(volatile int *)(&stat_shown)) static volatile int stat_shown; -static pthread_mutex_t stdout_mutex = PTHREAD_MUTEX_INITIALIZER; - static void exit_trace(int status); #define dip_tracing(dip) (*(volatile int *)(&(dip)->trace_started)) @@ -192,8 +205,8 @@ static void exit_trace(int status); for (__i = 0, __d = device_information; __i < __e; __i++, __d++) #define for_each_dip(__d, __i) __for_each_dip(__d, __i, ndevs) -#define for_each_tip(__d, __t, __i) \ - for (__i = 0, __t = (__d)->threads; __i < ncpus; __i++, __t++) +#define for_each_tip(__d, __t, __j) \ + for (__j = 0, __t = (__d)->threads; __j < ncpus; __j++, __t++) static int get_dropped_count(const char *buts_name) { @@ -273,126 +286,129 @@ static void wait_for_data(struct thread_information *tip) { struct pollfd pfd = { .fd = tip->fd, .events = POLLIN }; - poll(&pfd, 1, 10); + do { + poll(&pfd, 1, 10); + if (pfd.revents & POLLIN) + break; + if (tip->ofile_stdout) + break; + } while (!is_done()); } -static int __read_data(struct thread_information *tip, void *buf, int len, - int block) +static int read_data(struct thread_information *tip, void *buf, int len) { int ret = 0; - while (!is_done()) { - ret = read(tip->fd, buf, len); - if (ret > 0) - break; - else if (!ret) { - if (!block) - break; + do { + wait_for_data(tip); - wait_for_data(tip); - } else { + ret = read(tip->fd, buf, len); + if (!ret) + continue; + else if (ret > 0) + return ret; + else { if (errno != EAGAIN) { perror(tip->fn); fprintf(stderr,"Thread %d failed read of %s\n", tip->cpu, tip->fn); break; } - if (!block) { - ret = 0; - break; - } - - wait_for_data(tip); + continue; } - } + } while (!is_done()); return ret; } -#define can_grow_ring(tip) ((tip)->fd_max_size < RING_MAX_NR * buf_size * buf_nr) - -static int resize_ringbuffer(struct thread_information *tip) +static inline void tip_fd_unlock(struct thread_information *tip) { - if (!can_grow_ring(tip)) - return 1; - - tip->fd_buf = realloc(tip->fd_buf, 2 * tip->fd_max_size); - - /* - * if the ring currently wraps, copy range over - */ - if (tip->fd_off + tip->fd_size > tip->fd_max_size) { - unsigned long wrap_size = tip->fd_size - (tip->fd_max_size - tip->fd_off); - memmove(tip->fd_buf + tip->fd_max_size, tip->fd_buf, wrap_size); - } + pthread_mutex_unlock(&tip->lock); +} - tip->fd_max_size <<= 1; - return 0; +static inline void tip_fd_lock(struct thread_information *tip) +{ + pthread_mutex_lock(&tip->lock); } -static int __refill_ringbuffer(struct thread_information *tip, int len, - int block) +static int get_subbuf(struct thread_information *tip) { - unsigned long off; - int ret; + struct tip_subbuf *ts; + int ts_size; - off = (tip->fd_size + tip->fd_off) & (tip->fd_max_size - 1); - if (off + len > tip->fd_max_size) - len = tip->fd_max_size - off; + /* + * live tracing should get a lower count to make it more "realtime" + */ + if (tip->ofile_stdout) + ts_size = 1024; + else + ts_size = buf_size; - assert(len > 0); + ts = malloc(sizeof(*ts)); + ts->buf = malloc(ts_size); + ts->max_len = ts_size; - ret = __read_data(tip, tip->fd_buf + off, len, block); - if (ret < 0) - return -1; + ts->len = read_data(tip, ts->buf, ts_size); + if (ts->len > 0) { + tip_fd_lock(tip); + list_add_tail(&ts->list, &tip->subbuf_list); + tip_fd_unlock(tip); + return 0; + } - tip->fd_size += ret; - return ret; + free(ts->buf); + free(ts); + return -1; } -/* - * keep filling ring until we get a short read - */ -static void refill_ringbuffer(struct thread_information *tip, int block) +static void close_thread(struct thread_information *tip) { - int len = buf_size; - int ret; - - do { - if (len + tip->fd_size > tip->fd_max_size) - resize_ringbuffer(tip); + if (tip->fd != -1) + close(tip->fd); + if (tip->ofile) + fclose(tip->ofile); + if (tip->ofile_buffer) + free(tip->ofile_buffer); + if (tip->fd_buf) + free(tip->fd_buf); - ret = __refill_ringbuffer(tip, len, block); - } while ((ret == len) && !is_done()); + tip->fd = -1; + tip->ofile = NULL; + tip->ofile_buffer = NULL; + tip->fd_buf = NULL; } -static int read_data(struct thread_information *tip, void *buf, - unsigned int len) +static void *thread_main(void *arg) { - unsigned int start_size, end_size; + struct thread_information *tip = arg; + pid_t pid = getpid(); + cpu_set_t cpu_mask; - refill_ringbuffer(tip, len > tip->fd_size); + CPU_ZERO(&cpu_mask); + CPU_SET((tip->cpu), &cpu_mask); - if (len > tip->fd_size) - return -1; + if (sched_setaffinity(pid, sizeof(cpu_mask), &cpu_mask) == -1) { + perror("sched_setaffinity"); + exit_trace(1); + } - /* - * see if we wrap the ring - */ - start_size = len; - end_size = 0; - if (len > (tip->fd_max_size - tip->fd_off)) { - start_size = tip->fd_max_size - tip->fd_off; - end_size = len - start_size; + snprintf(tip->fn, sizeof(tip->fn), "%s/block/%s/trace%d", + relay_path, tip->device->buts_name, tip->cpu); + tip->fd = open(tip->fn, O_RDONLY); + if (tip->fd < 0) { + perror(tip->fn); + fprintf(stderr,"Thread %d failed open of %s\n", tip->cpu, + tip->fn); + exit_trace(1); } - memcpy(buf, tip->fd_buf + tip->fd_off, start_size); - if (end_size) - memcpy(buf + start_size, tip->fd_buf, end_size); + for (;;) { + if (get_subbuf(tip)) + break; + } - tip->fd_off = (tip->fd_off + len) & (tip->fd_max_size - 1); - tip->fd_size -= len; - return 0; + tip->exited = 1; + return NULL; } static int write_data(struct thread_information *tip, @@ -411,214 +427,130 @@ static int write_data(struct thread_information *tip, } } - if (tip->ofile_flush) + if (tip->ofile_stdout) fflush(tip->ofile); return 0; } -static void *extract_data(struct thread_information *tip, int nb) +static int flush_subbuf(struct thread_information *tip, struct tip_subbuf *ts) { - unsigned char *buf; + unsigned int offset = 0; + struct blk_io_trace *t; + int pdu_len, events = 0; - buf = malloc(nb); - if (!read_data(tip, buf, nb)) - return buf; + /* + * surplus from last run, a little ugly... + */ + if (tip->leftover_ts_len) { + if (ts->len + tip->leftover_ts_len > ts->max_len) + ts->buf = realloc(ts->buf, ts->len + tip->leftover_ts_len); - free(buf); - return NULL; -} + memmove(ts->buf + tip->leftover_ts_len, ts->buf, ts->len); + memcpy(ts->buf, tip->leftover_ts, tip->leftover_ts_len); + ts->len += tip->leftover_ts_len; + tip->leftover_ts_len = 0; + } -/* - * trace may start inside 'bit' or may need to be gotten further on - */ -static int get_event_slow(struct thread_information *tip, - struct blk_io_trace *bit) -{ - const int inc = sizeof(__u32); - struct blk_io_trace foo; - unsigned int offset; - void *p; + while (offset + sizeof(*t) <= ts->len) { + t = ts->buf + offset; - /* - * check if trace is inside - */ - offset = 0; - p = bit; - while (offset < sizeof(*bit)) { - p += inc; - offset += inc; + if (verify_trace(t)) + return -1; - memcpy(&foo, p, inc); + pdu_len = t->pdu_len; - if (CHECK_MAGIC(&foo)) + if (offset + sizeof(*t) + pdu_len > ts->len) break; - } - /* - * part trace found inside, read the rest - */ - if (offset < sizeof(*bit)) { - int good_bytes = sizeof(*bit) - offset; + trace_to_be(t); - memmove(bit, p, good_bytes); - p = (void *) bit + good_bytes; + if (write_data(tip, t, sizeof(*t) + pdu_len)) + return -1; - return read_data(tip, p, offset); + offset += sizeof(*t) + pdu_len; + tip->events_processed++; + events++; } /* - * nothing found, keep looking for start of trace - */ - do { - if (read_data(tip, bit, sizeof(bit->magic))) - return -1; - } while (!CHECK_MAGIC(bit)); - - /* - * now get the rest of it + * leftover bytes, save them for next time */ - p = &bit->sequence; - if (read_data(tip, p, sizeof(*bit) - inc)) - return -1; + if (offset != ts->len) { + int surplus = ts->len - offset; - return 0; -} - -/* - * Sometimes relayfs screws us a little, if an event crosses a sub buffer - * boundary. So keep looking forward in the trace data until an event - * is found - */ -static int get_event(struct thread_information *tip, struct blk_io_trace *bit) -{ - /* - * optimize for the common fast case, a full trace read that - * succeeds - */ - if (read_data(tip, bit, sizeof(*bit))) - return -1; + t = ts->buf + offset; + if (surplus > tip->leftover_ts_max) { + tip->leftover_ts = realloc(tip->leftover_ts, surplus); + tip->leftover_ts_max = surplus; + } - if (CHECK_MAGIC(bit)) - return 0; + memcpy(tip->leftover_ts, ts->buf + offset, surplus); + tip->leftover_ts_len = surplus; + } - /* - * ok that didn't work, the event may start somewhere inside the - * trace itself - */ - return get_event_slow(tip, bit); + free(ts->buf); + free(ts); + return events; } -static inline void tip_fd_unlock(struct thread_information *tip) +static int write_tip_events(struct thread_information *tip) { - if (tip->fd_lock) - pthread_mutex_unlock(tip->fd_lock); -} + struct tip_subbuf *ts = NULL; -static inline void tip_fd_lock(struct thread_information *tip) -{ - if (tip->fd_lock) - pthread_mutex_lock(tip->fd_lock); -} + tip_fd_lock(tip); + if (!list_empty(&tip->subbuf_list)) { + ts = list_entry(tip->subbuf_list.next, struct tip_subbuf, list); + list_del(&ts->list); + } + tip_fd_unlock(tip); -static void close_thread(struct thread_information *tip) -{ - if (tip->fd != -1) - close(tip->fd); - if (tip->ofile) - fclose(tip->ofile); - if (tip->ofile_buffer) - free(tip->ofile_buffer); - if (tip->fd_buf) - free(tip->fd_buf); + if (ts) + return flush_subbuf(tip, ts); - tip->fd = -1; - tip->ofile = NULL; - tip->ofile_buffer = NULL; - tip->fd_buf = NULL; + return 0; } -static void *extract(void *arg) +/* + * scans the tips we know and writes out the subbuffers we accumulate + */ +static void get_and_write_events(void) { - struct thread_information *tip = arg; - int pdu_len; - char *pdu_data; - struct blk_io_trace t; - pid_t pid = getpid(); - cpu_set_t cpu_mask; + struct device_information *dip; + struct thread_information *tip; + int i, j, events, ret, all_exited; - CPU_ZERO(&cpu_mask); - CPU_SET((tip->cpu), &cpu_mask); + while (!is_done()) { + events = 0; - if (sched_setaffinity(pid, sizeof(cpu_mask), &cpu_mask) == -1) { - perror("sched_setaffinity"); - exit_trace(1); - } + for_each_dip(dip, i) { + for_each_tip(dip, tip, j) { + ret = write_tip_events(tip); + if (ret > 0) + events += ret; + } + } - snprintf(tip->fn, sizeof(tip->fn), "%s/block/%s/trace%d", - relay_path, tip->device->buts_name, tip->cpu); - tip->fd = open(tip->fn, O_RDONLY | O_NONBLOCK); - if (tip->fd < 0) { - perror(tip->fn); - fprintf(stderr,"Thread %d failed open of %s\n", tip->cpu, - tip->fn); - exit_trace(1); + if (!events) + usleep(10); } /* - * start with a ringbuffer that is twice the size of the kernel side + * reap stored events */ - tip->fd_max_size = buf_size * buf_nr * RING_INIT_NR; - tip->fd_buf = malloc(tip->fd_max_size); - tip->fd_off = 0; - tip->fd_size = 0; - - pdu_data = NULL; - while (1) { - if (get_event(tip, &t)) - break; - - if (verify_trace(&t)) - break; - - pdu_len = t.pdu_len; - - trace_to_be(&t); - - if (pdu_len) { - pdu_data = extract_data(tip, pdu_len); - if (!pdu_data) - break; - } - - /* - * now we have both trace and payload, get a lock on the - * output descriptor and send it off - */ - tip_fd_lock(tip); - - if (write_data(tip, &t, sizeof(t))) { - tip_fd_unlock(tip); - break; - } - - if (pdu_data && write_data(tip, pdu_data, pdu_len)) { - tip_fd_unlock(tip); - break; - } - - tip_fd_unlock(tip); - - if (pdu_data) { - free(pdu_data); - pdu_data = NULL; + do { + events = 0; + all_exited = 0; + for_each_dip(dip, i) { + for_each_tip(dip, tip, j) { + ret = write_tip_events(tip); + if (ret > 0) + events += ret; + all_exited += !tip->exited; + } } - - tip->events_processed++; - } - - close_thread(tip); - return NULL; + usleep(10); + } while (events && !all_exited); } static int start_threads(struct device_information *dip) @@ -631,13 +563,15 @@ static int start_threads(struct device_information *dip) for_each_tip(dip, tip, j) { tip->cpu = j; tip->device = dip; - tip->fd_lock = NULL; tip->events_processed = 0; + INIT_LIST_HEAD(&tip->subbuf_list); + tip->leftover_ts_len = 0; + tip->leftover_ts_max = 0; + tip->leftover_ts = NULL; if (pipeline) { tip->ofile = fdopen(STDOUT_FILENO, "w"); - tip->fd_lock = &stdout_mutex; - tip->ofile_flush = 1; + tip->ofile_stdout = 1; mode = _IOLBF; vbuf_size = 512; } else { @@ -654,7 +588,7 @@ static int start_threads(struct device_information *dip) dip->buts_name, tip->cpu); } tip->ofile = fopen(op, "w"); - tip->ofile_flush = 0; + tip->ofile_stdout = 0; mode = _IOFBF; vbuf_size = OFILE_BUF; } @@ -671,7 +605,7 @@ static int start_threads(struct device_information *dip) return 1; } - if (pthread_create(&tip->thread, NULL, extract, tip)) { + if (pthread_create(&tip->thread, NULL, thread_main, tip)) { perror("pthread_create"); close_thread(tip); return 1; @@ -687,8 +621,10 @@ static void stop_threads(struct device_information *dip) unsigned long ret; int i; - for_each_tip(dip, tip, i) + for_each_tip(dip, tip, i) { (void) pthread_join(tip->thread, (void *) &ret); + close_thread(tip); + } } static void stop_all_threads(void) @@ -800,33 +736,30 @@ static int start_devices(void) static void show_stats(void) { - int i, j, no_stdout = 0; struct device_information *dip; struct thread_information *tip; unsigned long long events_processed; unsigned long total_drops; + int i, j; if (is_stat_shown()) return; stat_shown = 1; - if (output_name && !strcmp(output_name, "-")) - no_stdout = 1; - total_drops = 0; for_each_dip(dip, i) { - if (!no_stdout) + if (!tip->ofile_stdout) printf("Device: %s\n", dip->path); events_processed = 0; for_each_tip(dip, tip, j) { - if (!no_stdout) + if (!tip->ofile_stdout) printf(" CPU%3d: %20ld events\n", tip->cpu, tip->events_processed); events_processed += tip->events_processed; } total_drops += dip->drop_count; - if (!no_stdout) + if (!tip->ofile_stdout) printf(" Total: %20lld events (dropped %lu)\n", events_processed, dip->drop_count); } @@ -857,13 +790,6 @@ static void show_usage(char *program) static void handle_sigint(__attribute__((__unused__)) int sig) { done = 1; - if (!is_trace_stopped()) { - trace_stopped = 1; - stop_all_threads(); - stop_all_traces(); - } - - show_stats(); } int main(int argc, char *argv[]) @@ -1006,8 +932,7 @@ int main(int argc, char *argv[]) if (stop_watch) alarm(stop_watch); - while (!is_done()) - sleep(1); + get_and_write_events(); if (!is_trace_stopped()) { trace_stopped = 1;