From 8e86c98a76fc3796d0ef0ac7d749475aa1e0ad02 Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Tue, 14 Feb 2006 10:51:39 +0100 Subject: [PATCH] [PATCH] blktrace: first cut at adding network support --- blktrace.c | 517 ++++++++++++++++++++++++++++++++++++++++++++++------- 1 file changed, 453 insertions(+), 64 deletions(-) diff --git a/blktrace.c b/blktrace.c index 549f484..6a7c58a 100644 --- a/blktrace.c +++ b/blktrace.c @@ -31,12 +31,17 @@ #include #include #include +#include #include #include #include #include #include #include +#include +#include +#include +#include #include "blktrace.h" #include "barrier.h" @@ -54,7 +59,7 @@ static char blktrace_version[] = "0.99"; #define RELAYFS_TYPE 0xF0B4A981 -#define S_OPTS "d:a:A:r:o:kw:Vb:n:D:" +#define S_OPTS "d:a:A:r:o:kw:Vb:n:D:lh:p:" static struct option l_opts[] = { { .name = "dev", @@ -122,6 +127,24 @@ static struct option l_opts[] = { .flag = NULL, .val = 'D' }, + { + .name = "listen", + .has_arg = no_argument, + .flag = NULL, + .val = 'l' + }, + { + .name = "host", + .has_arg = required_argument, + .flag = NULL, + .val = 'h' + }, + { + .name = "port", + .has_arg = required_argument, + .flag = NULL, + .val = 'p' + }, { .name = NULL, } @@ -131,6 +154,7 @@ struct tip_subbuf { void *buf; unsigned int len; unsigned int max_len; + off_t offset; }; #define FIFO_SIZE (1024) /* should be plenty big! */ @@ -150,9 +174,13 @@ struct thread_information { void *fd_buf; char fn[MAXPATHLEN + 64]; + char ofname[64]; + FILE *ofile; + off_t ofile_offset; char *ofile_buffer; int ofile_stdout; + int ofile_mmap; unsigned long events_processed; unsigned long long data_read; @@ -209,6 +237,8 @@ static volatile int trace_stopped; #define is_stat_shown() (*(volatile int *)(&stat_shown)) static volatile int stat_shown; +int data_is_native = -1; + static void exit_trace(int status); #define dip_tracing(dip) (*(volatile int *)(&(dip)->trace_started)) @@ -221,6 +251,40 @@ static void exit_trace(int status); #define for_each_tip(__d, __t, __j) \ for (__j = 0, __t = (__d)->threads; __j < ncpus; __j++, __t++) +/* + * networking stuff follows. we include a magic number so we know whether + * to endianness convert or not + */ +struct blktrace_net_hdr { + u32 magic; /* same as trace magic */ + char ofname[64]; /* trace name */ + u32 cpu; /* for which cpu */ + u32 len; /* length of following trace data */ +}; + +#define TRACE_NET_PORT (8462) + +enum { + Net_none = 0, + Net_server, + Net_client, +}; + +/* + * network cmd line params + */ +static char hostname[MAXHOSTNAMELEN]; +static int net_port = TRACE_NET_PORT; +static int net_mode = 0; + +static int net_in_fd = -1; +static int net_out_fd = -1; + +static void handle_sigint(__attribute__((__unused__)) int sig) +{ + done = 1; +} + static int get_dropped_count(const char *buts_name) { int fd; @@ -315,7 +379,7 @@ static void wait_for_data(struct thread_information *tip) } while (!is_done()); } -static int read_data(struct thread_information *tip, void *buf, int len) +static int read_data_file(struct thread_information *tip, void *buf, int len) { int ret = 0; @@ -339,9 +403,45 @@ static int read_data(struct thread_information *tip, void *buf, int len) } while (!is_done()); return ret; + } -static inline struct tip_subbuf *subbuf_fifo_dequeue(struct thread_information *tip) +static int read_data_net(struct thread_information *tip, void *buf, int len) +{ + unsigned int bytes_left = len; + int ret = 0; + + do { + ret = recv(net_in_fd, buf, bytes_left, MSG_WAITALL); + + if (!ret) + continue; + else if (ret < 0) { + if (errno != EAGAIN) { + perror(tip->fn); + fprintf(stderr, "server: failed read\n"); + return 0; + } + continue; + } else { + buf += ret; + bytes_left -= ret; + } + } while (!is_done() && bytes_left); + + return len; +} + +static int read_data(struct thread_information *tip, void *buf, int len) +{ + if (net_mode == Net_server) + return read_data_net(tip, buf, len); + + return read_data_file(tip, buf, len); +} + +static inline struct tip_subbuf * +subbuf_fifo_dequeue(struct thread_information *tip) { const int head = tip->fifo.head; const int next = (head + 1) & (FIFO_SIZE - 1); @@ -377,7 +477,7 @@ static inline int subbuf_fifo_queue(struct thread_information *tip, /* * For file output, truncate and mmap the file appropriately */ -static int mmap_subbuf(struct thread_information *tip) +static int mmap_subbuf(struct thread_information *tip, unsigned int maxlen) { int ofd = fileno(tip->ofile); int ret; @@ -410,7 +510,7 @@ static int mmap_subbuf(struct thread_information *tip) mlock(tip->fs_buf, tip->fs_buf_len); } - ret = read_data(tip, tip->fs_buf + tip->fs_off, buf_size); + ret = read_data(tip, tip->fs_buf + tip->fs_off, maxlen); if (ret >= 0) { tip->data_read += ret; tip->fs_size += ret; @@ -426,21 +526,46 @@ static int mmap_subbuf(struct thread_information *tip) */ static int get_subbuf(struct thread_information *tip) { - struct tip_subbuf *ts; - int ret; + struct tip_subbuf *ts = malloc(sizeof(*ts)); + int ret = 1; - ts = malloc(sizeof(*ts)); - ts->buf = malloc(buf_size); - ts->max_len = buf_size; + /* + * for the net client, don't actually read the data in as we will + * use sendfile to transmit it. just mark how much data we have + */ + if (net_mode == Net_client) { + struct stat sb; - ret = read_data(tip, ts->buf, ts->max_len); - if (ret > 0) { - ts->len = ret; - return subbuf_fifo_queue(tip, ts); + ts->buf = NULL; + + if (fstat(tip->fd, &sb) < 0) { + perror("trace stat"); + goto out; + } + + ts->len = sb.st_size - tip->ofile_offset; + ts->max_len = ts->len; + ts->offset = tip->ofile_offset; + tip->ofile_offset += ts->len; + ret = subbuf_fifo_queue(tip, ts); + } else { + ts->buf = malloc(buf_size); + ts->max_len = buf_size; + + ret = read_data(tip, ts->buf, ts->max_len); + if (ret > 0) { + ts->len = ret; + ret = subbuf_fifo_queue(tip, ts); + } + } + +out: + if (ret) { + if (ts->buf) + free(ts->buf); + free(ts); } - free(ts->buf); - free(ts); return ret; } @@ -461,6 +586,21 @@ static void close_thread(struct thread_information *tip) tip->fd_buf = NULL; } +static void tip_ftrunc_final(struct thread_information *tip) +{ + /* + * truncate to right size and cleanup mmap + */ + if (tip->ofile_mmap) { + int ofd = fileno(tip->ofile); + + if (tip->fs_buf) + munmap(tip->fs_buf, tip->fs_buf_len); + + ftruncate(ofd, tip->fs_size); + } +} + static void *thread_main(void *arg) { struct thread_information *tip = arg; @@ -486,33 +626,64 @@ static void *thread_main(void *arg) } while (!is_done()) { - if (tip->ofile_stdout) { - if (get_subbuf(tip)) + if (tip->ofile_mmap && net_mode != Net_client) { + if (mmap_subbuf(tip, buf_size)) break; } else { - if (mmap_subbuf(tip)) + if (get_subbuf(tip)) break; } } - /* - * truncate to right size and cleanup mmap - */ - if (!tip->ofile_stdout) { - int ofd = fileno(tip->ofile); + tip_ftrunc_final(tip); + tip->exited = 1; + return NULL; +} - if (tip->fs_buf) - munmap(tip->fs_buf, tip->fs_buf_len); +static int write_data_net(int fd, void *buf, unsigned int buf_len) +{ + unsigned int bytes_left = buf_len; + int ret; - ftruncate(ofd, tip->fs_size); + while (bytes_left) { + ret = send(fd, buf, bytes_left, 0); + if (ret < 0) { + perror("send"); + return 1; + } + + buf += ret; + bytes_left -= ret; } - tip->exited = 1; - return NULL; + return 0; } -static int write_data(struct thread_information *tip, - void *buf, unsigned int buf_len) +static int flush_subbuf_net(struct thread_information *tip, + struct tip_subbuf *ts) +{ + struct blktrace_net_hdr hdr; + int ret = 0; + + hdr.magic = BLK_IO_TRACE_MAGIC; + strcpy(hdr.ofname, tip->ofname); + hdr.cpu = tip->cpu; + hdr.len = ts->len; + + if (write_data_net(net_out_fd, &hdr, sizeof(hdr))) + return 1; + + if (sendfile(net_out_fd, tip->fd, &ts->offset, ts->len) < 0) { + perror("sendfile"); + ret = 1; + } + + free(ts); + return 0; +} + +static int write_data(struct thread_information *tip, void *buf, + unsigned int buf_len) { int ret; @@ -536,7 +707,8 @@ static int write_data(struct thread_information *tip, return 0; } -static int flush_subbuf(struct thread_information *tip, struct tip_subbuf *ts) +static int flush_subbuf_file(struct thread_information *tip, + struct tip_subbuf *ts) { unsigned int offset = 0; struct blk_io_trace *t; @@ -604,8 +776,12 @@ static int write_tip_events(struct thread_information *tip) { struct tip_subbuf *ts = subbuf_fifo_dequeue(tip); - if (ts) - return flush_subbuf(tip, ts); + if (ts) { + if (net_mode == Net_client) + return flush_subbuf_net(tip, ts); + + return flush_subbuf_file(tip, ts); + } return 0; } @@ -655,10 +831,11 @@ static void get_and_write_events(void) static void wait_for_threads(void) { /* - * for piped output, poll and fetch data for writeout. for files, - * we just wait around for trace threads to exit + * for piped or network output, poll and fetch data for writeout. + * for files, we just wait around for trace threads to exit */ - if (output_name && !strcmp(output_name, "-")) + if ((output_name && !strcmp(output_name, "-")) || + net_mode == Net_client) get_and_write_events(); else { struct device_information *dip; @@ -676,12 +853,24 @@ static void wait_for_threads(void) } } +static void fill_ofname(char *dst, char *buts_name, int cpu) +{ + int len = 0; + + if (output_dir) + len = sprintf(dst, "%s/", output_dir); + + if (output_name) + sprintf(dst + len, "%s.blktrace.%d", output_name, cpu); + else + sprintf(dst + len, "%s.blktrace.%d", buts_name, cpu); +} + static int start_threads(struct device_information *dip) { struct thread_information *tip; - char op[64]; int j, pipeline = output_name && !strcmp(output_name, "-"); - int len, mode, vbuf_size; + int mode, vbuf_size; for_each_tip(dip, tip, j) { tip->cpu = j; @@ -693,29 +882,20 @@ static int start_threads(struct device_information *dip) if (pipeline) { tip->ofile = fdopen(STDOUT_FILENO, "w"); tip->ofile_stdout = 1; + tip->ofile_mmap = 0; mode = _IOLBF; vbuf_size = 512; } else { - len = 0; - - if (output_dir) - len = sprintf(op, "%s/", output_dir); - - if (output_name) { - sprintf(op + len, "%s.blktrace.%d", output_name, - tip->cpu); - } else { - sprintf(op + len, "%s.blktrace.%d", - dip->buts_name, tip->cpu); - } - tip->ofile = fopen(op, "w+"); + fill_ofname(tip->ofname, dip->buts_name, tip->cpu); + tip->ofile = fopen(tip->ofname, "w+"); tip->ofile_stdout = 0; + tip->ofile_mmap = 1; mode = _IOFBF; vbuf_size = OFILE_BUF; } if (tip->ofile == NULL) { - perror(op); + perror(tip->ofname); return 1; } @@ -896,6 +1076,203 @@ static void show_stats(void) fprintf(stderr, "You have dropped events, consider using a larger buffer size (-b)\n"); } +static struct thread_information *net_tip_list; +static int net_tip_len; + +static struct thread_information *net_get_tip(struct blktrace_net_hdr *bnh) +{ + struct thread_information *tip; + int i; + + for (i = 0; i < net_tip_len; i++) { + tip = &net_tip_list[i]; + + if (!strcmp(bnh->ofname, tip->ofname)) + return tip; + } + + net_tip_list = realloc(net_tip_list, (net_tip_len + 1) * sizeof(*tip)); + tip = &net_tip_list[net_tip_len]; + net_tip_len++; + + memset(tip, 0, sizeof(*tip)); + + tip->cpu = bnh->cpu; + strcpy(tip->ofname, bnh->ofname); + + tip->ofile = fopen(tip->ofname, "w+"); + if (!tip->ofile) { + perror("fopen"); + return NULL; + } + + tip->ofile_stdout = 0; + tip->ofile_mmap = 1; + return tip; +} + +static int net_get_header(struct blktrace_net_hdr *bnh) +{ + int fl = fcntl(net_in_fd, F_GETFL); + int bytes_left, ret; + void *p = bnh; + + fcntl(net_in_fd, F_SETFL, fl | O_NONBLOCK); + bytes_left = sizeof(*bnh); + while (bytes_left && !is_done()) { + ret = recv(net_in_fd, p, bytes_left, MSG_WAITALL); + if (ret < 0) { + if (errno != EAGAIN) { + perror("recv header"); + return 1; + } + usleep(100); + continue; + } else if (!ret) { + usleep(100); + continue; + } else { + p += ret; + bytes_left -= ret; + } + } + fcntl(net_in_fd, F_SETFL, fl & ~O_NONBLOCK); + return 0; +} + +static int net_server_loop(void) +{ + struct thread_information *tip; + struct blktrace_net_hdr bnh; + + if (net_get_header(&bnh)) + return 1; + + if (data_is_native == -1 && check_data_endianness(bnh.magic)) { + fprintf(stderr, "server: received data is bad\n"); + return 1; + } + + if (!data_is_native) { + bnh.cpu = be32_to_cpu(bnh.cpu); + bnh.len = be32_to_cpu(bnh.len); + } + + tip = net_get_tip(&bnh); + if (!tip) + return 1; + + if (mmap_subbuf(tip, bnh.len)) + return 1; + + return 0; +} + +/* + * Start here when we are in server mode - just fetch data from the network + * and dump to files + */ +static int net_server(void) +{ + struct sockaddr_in addr; + socklen_t socklen; + int fd, opt, i; + + fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0) { + perror("server: socket"); + return 1; + } + + opt = 1; + if (setsockopt(fd, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt)) < 0) { + perror("setsockopt"); + return 1; + } + + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_addr.s_addr = htonl(INADDR_ANY); + addr.sin_port = htons(net_port); + + if (bind(fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) { + perror("bind"); + return 1; + } + + if (listen(fd, 1) < 0) { + perror("listen"); + return 1; + } + + printf("blktrace: waiting for incoming connection...\n"); + + socklen = sizeof(addr); + net_in_fd = accept(fd, (struct sockaddr *) &addr, &socklen); + if (net_in_fd < 0) { + perror("accept"); + return 1; + } + + signal(SIGINT, handle_sigint); + signal(SIGHUP, handle_sigint); + signal(SIGTERM, handle_sigint); + signal(SIGALRM, handle_sigint); + + printf("blktrace: connected!\n"); + + while (!is_done()) { + if (net_server_loop()) + break; + } + + for (i = 0; i < net_tip_len; i++) + tip_ftrunc_final(&net_tip_list[i]); + + return 0; +} + +/* + * Setup outgoing network connection where we will transmit data + */ +static int net_setup_client(void) +{ + struct sockaddr_in addr; + int fd; + + fd = socket(AF_INET, SOCK_STREAM, 0); + if (fd < 0) { + perror("client: socket"); + return 1; + } + + memset(&addr, 0, sizeof(addr)); + addr.sin_family = AF_INET; + addr.sin_port = htons(net_port); + + if (inet_aton(hostname, &addr.sin_addr) != 1) { + struct hostent *hent = gethostbyname(hostname); + if (!hent) { + perror("gethostbyname"); + return 1; + } + + memcpy(&addr.sin_addr, hent->h_addr, 4); + strcpy(hostname, hent->h_name); + } + + printf("blktrace: connecting to %s\n", hostname); + + if (connect(fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) { + perror("client: connect"); + return 1; + } + + printf("blktrace: connected!\n"); + net_out_fd = fd; + return 0; +} + static char usage_str[] = \ "-d [ -r relay path ] [ -o ] [-k ] [ -w time ]\n" \ "[ -a action ] [ -A action mask ] [ -v ]\n\n" \ @@ -915,10 +1292,6 @@ static void show_usage(char *program) { fprintf(stderr, "Usage: %s %s %s",program, blktrace_version, usage_str); } -static void handle_sigint(__attribute__((__unused__)) int sig) -{ - done = 1; -} int main(int argc, char *argv[]) { @@ -998,6 +1371,16 @@ int main(int argc, char *argv[]) case 'D': output_dir = optarg; break; + case 'h': + net_mode = Net_client; + strcpy(hostname, optarg); + break; + case 'l': + net_mode = Net_server; + break; + case 'p': + net_port = atoi(optarg); + break; default: show_usage(argv[0]); return 1; @@ -1009,6 +1392,13 @@ int main(int argc, char *argv[]) return 1; } + setlocale(LC_NUMERIC, "en_US"); + + page_size = getpagesize(); + + if (net_mode == Net_server) + return net_server(); + if (ndevs == 0) { show_usage(argv[0]); return 1; @@ -1039,24 +1429,23 @@ int main(int argc, char *argv[]) return 0; } - setlocale(LC_NUMERIC, "en_US"); - ncpus = sysconf(_SC_NPROCESSORS_ONLN); if (ncpus < 0) { fprintf(stderr, "sysconf(_SC_NPROCESSORS_ONLN) failed\n"); return 1; } - page_size = getpagesize(); - - if (start_devices() != 0) - return 1; - signal(SIGINT, handle_sigint); signal(SIGHUP, handle_sigint); signal(SIGTERM, handle_sigint); signal(SIGALRM, handle_sigint); + if (net_mode == Net_client && net_setup_client()) + return 1; + + if (start_devices() != 0) + return 1; + atexit(stop_all_tracing); if (stop_watch) -- 2.25.1