From e0a1988b94de5bee7f0b7d8084694f451a6339ee Mon Sep 17 00:00:00 2001 From: Jens Axboe Date: Thu, 23 Feb 2006 13:29:06 +0100 Subject: [PATCH] [PATCH] blktrace: support multiple network connections --- blktrace.c | 292 +++++++++++++++++++++++++++++++++++------------------ 1 file changed, 191 insertions(+), 101 deletions(-) diff --git a/blktrace.c b/blktrace.c index beadcf7..9aefac7 100644 --- a/blktrace.c +++ b/blktrace.c @@ -218,15 +218,14 @@ struct device_information { char buts_name[32]; volatile int trace_started; unsigned long drop_count; - struct in_addr cl_in_addr; struct thread_information *threads; + struct net_connection *nc; }; static int ncpus; static struct thread_information *thread_information; static int ndevs; static struct device_information *device_information; -static int ndevs_running; /* command line option globals */ static char *relay_path; @@ -254,10 +253,14 @@ static void exit_trace(int status); #define dip_tracing(dip) (*(volatile int *)(&(dip)->trace_started)) #define dip_set_tracing(dip, v) ((dip)->trace_started = (v)) -#define __for_each_dip(__d, __i, __e) \ - for (__i = 0, __d = device_information; __i < __e; __i++, __d++) +#define __for_each_dip(__d, __i, __di, __e) \ + for (__i = 0, __d = __di; __i < __e; __i++, __d++) + +#define for_each_dip(__d, __i) \ + __for_each_dip(__d, __i, device_information, ndevs) +#define for_each_nc_dip(__nc, __d, __i) \ + __for_each_dip(__d, __i, (__nc)->device_information, (__nc)->ndevs) -#define for_each_dip(__d, __i) __for_each_dip(__d, __i, ndevs) #define for_each_tip(__d, __t, __j) \ for (__j = 0, __t = (__d)->threads; __j < ncpus; __j++, __t++) @@ -289,8 +292,19 @@ static int net_port = TRACE_NET_PORT; static int net_mode = 0; static int net_use_sendfile; -static int net_in_fd = -1; -static time_t net_connect_time; +struct net_connection { + int in_fd; + time_t connect_time; + struct in_addr cl_in_addr; + struct device_information *device_information; + int ndevs; + int ncpus; + int connection_index; +}; + +#define NET_MAX_CONNECTIONS (1024) +static struct net_connection net_connections[NET_MAX_CONNECTIONS]; +static int net_connects; static int net_out_fd = -1; static void handle_sigint(__attribute__((__unused__)) int sig) @@ -442,11 +456,12 @@ static int read_data_file(struct thread_information *tip, void *buf, static int read_data_net(struct thread_information *tip, void *buf, unsigned int len) { + struct net_connection *nc = tip->device->nc; unsigned int bytes_left = len; int ret = 0; do { - ret = recv(net_in_fd, buf, bytes_left, MSG_WAITALL); + ret = recv(nc->in_fd, buf, bytes_left, MSG_WAITALL); if (!ret) continue; @@ -966,8 +981,10 @@ static int fill_ofname(struct device_information *dip, len = sprintf(dst, "%s/", output_dir); if (net_mode == Net_server) { - len += sprintf(dst + len, "%s-", inet_ntoa(dip->cl_in_addr)); - len += strftime(dst + len, 64, "%F-%T/", gmtime(&net_connect_time)); + struct net_connection *nc = dip->nc; + + len += sprintf(dst + len, "%s-", inet_ntoa(nc->cl_in_addr)); + len += strftime(dst + len, 64, "%F-%T/", gmtime(&nc->connect_time)); } if (stat(dst, &sb) < 0) { @@ -1182,7 +1199,7 @@ static int start_devices(void) } if (i != ndevs) { - __for_each_dip(dip, j, i) + __for_each_dip(dip, j, device_information, i) stop_trace(dip); return 1; @@ -1197,7 +1214,7 @@ static int start_devices(void) } if (i != ndevs) { - __for_each_dip(dip, j, i) + __for_each_dip(dip, j, device_information, i) stop_threads(dip); for_each_dip(dip, i) stop_trace(dip); @@ -1208,7 +1225,7 @@ static int start_devices(void) return 0; } -static void show_stats(void) +static void show_stats(struct device_information *dips, int ndips, int cpus) { struct device_information *dip; struct thread_information *tip; @@ -1225,12 +1242,14 @@ static void show_stats(void) stat_shown = 1; total_drops = 0; - for_each_dip(dip, i) { + for (i = 0; i < ndips; i++) { + dip = &dips[i]; if (!no_stdout) printf("Device: %s\n", dip->path); events_processed = 0; data_read = 0; - for_each_tip(dip, tip, j) { + for (j = 0; j < cpus; j++) { + tip = &dip->threads[j]; if (!no_stdout) printf(" CPU%3d: %20lu events, %8llu KiB data\n", tip->cpu, tip->events_processed, @@ -1249,36 +1268,35 @@ static void show_stats(void) fprintf(stderr, "You have dropped events, consider using a larger buffer size (-b)\n"); } -static struct device_information *net_get_dip(char *buts_name, - struct in_addr *cl_in_addr) +static struct device_information *net_get_dip(struct net_connection *nc, + char *buts_name) { struct device_information *dip; int i; - for (i = 0; i < ndevs; i++) { - dip = &device_information[i]; + for (i = 0; i < nc->ndevs; i++) { + dip = &nc->device_information[i]; if (!strcmp(dip->buts_name, buts_name)) return dip; } - device_information = realloc(device_information, (ndevs + 1) * sizeof(*dip)); - dip = &device_information[ndevs]; + nc->device_information = realloc(device_information, (nc->ndevs + 1) * sizeof(*dip)); + dip = &nc->device_information[nc->ndevs]; memset(dip, 0, sizeof(*dip)); dip->fd = -1; - dip->cl_in_addr = *cl_in_addr; + dip->nc = nc; strcpy(dip->buts_name, buts_name); dip->path = strdup(buts_name); dip->trace_started = 1; - ndevs++; - ndevs_running++; - dip->threads = malloc(ncpus * sizeof(struct thread_information)); - memset(dip->threads, 0, ncpus * sizeof(struct thread_information)); + nc->ndevs++; + dip->threads = malloc(nc->ncpus * sizeof(struct thread_information)); + memset(dip->threads, 0, nc->ncpus * sizeof(struct thread_information)); /* * open all files */ - for (i = 0; i < ncpus; i++) { + for (i = 0; i < nc->ncpus; i++) { struct thread_information *tip = &dip->threads[i]; tip->cpu = i; @@ -1292,13 +1310,13 @@ static struct device_information *net_get_dip(char *buts_name, return dip; } -static struct thread_information *net_get_tip(struct blktrace_net_hdr *bnh, - struct in_addr *cl_in_addr) +static struct thread_information *net_get_tip(struct net_connection *nc, + struct blktrace_net_hdr *bnh) { struct device_information *dip; - ncpus = bnh->max_cpus; - dip = net_get_dip(bnh->buts_name, cl_in_addr); + nc->ncpus = bnh->max_cpus; + dip = net_get_dip(nc, bnh->buts_name); if (!dip->trace_started) { fprintf(stderr, "Events for closed devices %s\n", dip->buts_name); return NULL; @@ -1307,16 +1325,17 @@ static struct thread_information *net_get_tip(struct blktrace_net_hdr *bnh, return &dip->threads[bnh->cpu]; } -static int net_get_header(struct blktrace_net_hdr *bnh) +static int net_get_header(struct net_connection *nc, + struct blktrace_net_hdr *bnh) { - int fl = fcntl(net_in_fd, F_GETFL); + int fl = fcntl(nc->in_fd, F_GETFL); int bytes_left, ret; void *p = bnh; - fcntl(net_in_fd, F_SETFL, fl | O_NONBLOCK); + fcntl(nc->in_fd, F_SETFL, fl | O_NONBLOCK); bytes_left = sizeof(*bnh); while (bytes_left && !is_done()) { - ret = recv(net_in_fd, p, bytes_left, MSG_WAITALL); + ret = recv(nc->in_fd, p, bytes_left, MSG_WAITALL); if (ret < 0) { if (errno != EAGAIN) { perror("recv header"); @@ -1332,16 +1351,77 @@ static int net_get_header(struct blktrace_net_hdr *bnh) bytes_left -= ret; } } - fcntl(net_in_fd, F_SETFL, fl & ~O_NONBLOCK); + fcntl(nc->in_fd, F_SETFL, fl & ~O_NONBLOCK); return bytes_left; } -static int net_server_loop(struct in_addr *cl_in_addr) +/* + * finalize a net client: truncate files, show stats, cleanup, etc + */ +static void net_client_done(struct net_connection *nc) +{ + struct device_information *dip; + struct thread_information *tip; + struct net_connection *last_nc; + int i, j; + + for_each_nc_dip(nc, dip, i) { + for (j = 0; j < nc->ncpus; j++) { + tip = &dip->threads[j]; + + tip_ftrunc_final(tip); + } + } + + show_stats(nc->device_information, nc->ndevs, nc->ncpus); + + /* + * cleanup for next run + */ + for_each_nc_dip(nc, dip, i) { + for (j = 0; j < nc->ncpus; j++) { + tip = &dip->threads[j]; + + fclose(tip->ofile); + } + + free(dip->threads); + free(dip->path); + } + + free(nc->device_information); + nc->device_information = NULL; + nc->ncpus = nc->ndevs = 0; + + close(nc->in_fd); + nc->in_fd = -1; + + net_connects--; + + /* + * now put last entry where this one was, a little nasty since we + * need to adjust dip->nc as well + */ + if (nc->connection_index != net_connects) { + last_nc = &net_connections[net_connects]; + last_nc->connection_index = nc->connection_index; + *nc = *last_nc; + for_each_nc_dip(nc, dip, i) + dip->nc = nc; + } + + stat_shown = 0; +} + +/* + * handle incoming events from a net client + */ +static int net_client_data(struct net_connection *nc) { struct thread_information *tip; struct blktrace_net_hdr bnh; - if (net_get_header(&bnh)) + if (net_get_header(nc, &bnh)) return 1; if (data_is_native == -1 && check_data_endianness(bnh.magic)) { @@ -1369,16 +1449,16 @@ static int net_server_loop(struct in_addr *cl_in_addr) */ struct device_information *dip; - dip = net_get_dip(bnh.buts_name, cl_in_addr); + dip = net_get_dip(nc, bnh.buts_name); dip->drop_count = bnh.cpu; dip->trace_started = 0; - ndevs_running--; - fprintf(stderr, "server: end of run for %s\n", dip->buts_name); - return !ndevs_running; + printf("server: end of run for %s\n", dip->buts_name); + net_client_done(nc); + return 0; } - tip = net_get_tip(&bnh, cl_in_addr); + tip = net_get_tip(nc, &bnh); if (!tip) return 1; @@ -1388,30 +1468,77 @@ static int net_server_loop(struct in_addr *cl_in_addr) return 0; } -static int get_connection(int fd, struct sockaddr_in *addr) +static void net_add_connection(int listen_fd, struct sockaddr_in *addr) { - struct pollfd pfd = { .fd = fd, .events = POLLIN }; - socklen_t socklen; - - printf("blktrace: waiting for incoming connection...\n"); + socklen_t socklen = sizeof(*addr); + struct net_connection *nc; - if (poll(&pfd, 1, -1) < 0) { - perror("poll for connection"); - return 1; + if (net_connects == NET_MAX_CONNECTIONS) { + fprintf(stderr, "server: no more connections allowed\n"); + return; } - if ((pfd.revents & POLLIN) == 0) - return 1; - socklen = sizeof(*addr); - net_in_fd = accept(fd, (struct sockaddr *) addr, &socklen); - if (net_in_fd < 0) { + nc = &net_connections[net_connects]; + + nc->in_fd = accept(listen_fd, (struct sockaddr *) addr, &socklen); + if (nc->in_fd < 0) { perror("accept"); - return 1; + return; } - time(&net_connect_time); - printf("blktrace: connection from %s\n", inet_ntoa(addr->sin_addr)); - return 0; + printf("server: connection from %s\n", inet_ntoa(addr->sin_addr)); + time(&nc->connect_time); + nc->connection_index = net_connects; + nc->cl_in_addr = addr->sin_addr; + net_connects++; +} + +/* + * event driven loop, handle new incoming connections and data from + * existing connections + */ +static void net_server_handle_connections(int listen_fd, + struct sockaddr_in *addr) +{ + struct pollfd pfds[NET_MAX_CONNECTIONS + 1]; + int i, events; + + printf("server: waiting for connections...\n"); + + while (!is_done()) { + /* + * the zero entry is for incoming connections, remaining + * entries for clients + */ + pfds[0].fd = listen_fd; + pfds[0].events = POLLIN; + for (i = 0; i < net_connects; i++) { + pfds[i + 1].fd = net_connections[i].in_fd; + pfds[i + 1].events = POLLIN; + } + + events = poll(pfds, 1 + net_connects, -1); + if (events < 0) { + if (errno == EINTR) + continue; + + perror("poll"); + break; + } else if (!events) + continue; + + if (pfds[0].revents & POLLIN) { + net_add_connection(listen_fd, addr); + events--; + } + + for (i = 0; events && i < net_connects; i++) { + if (pfds[i + 1].revents & POLLIN) { + net_client_data(&net_connections[i]); + events--; + } + } + } } /* @@ -1420,10 +1547,8 @@ static int get_connection(int fd, struct sockaddr_in *addr) */ static int net_server(void) { - struct device_information *dip; - struct thread_information *tip; struct sockaddr_in addr; - int fd, opt, i, j; + int fd, opt; fd = socket(AF_INET, SOCK_STREAM, 0); if (fd < 0) { @@ -1452,43 +1577,8 @@ static int net_server(void) return 1; } -repeat: - if (get_connection(fd, &addr)) - return 0; - - while (!is_done()) { - if (net_server_loop(&addr.sin_addr)) - break; - } - - for_each_dip(dip, i) - for_each_tip(dip, tip, j) - tip_ftrunc_final(tip); - - show_stats(); - - if (is_done()) - return 0; - - /* - * cleanup for next run - */ - for_each_dip(dip, i) { - for_each_tip(dip, tip, j) - fclose(tip->ofile); - - free(dip->threads); - free(dip->path); - } - - free(device_information); - device_information = NULL; - ncpus = ndevs = 0; - - close(net_in_fd); - net_in_fd = -1; - stat_shown = 0; - goto repeat; + net_server_handle_connections(fd, &addr); + return 0; } /* @@ -1725,7 +1815,7 @@ int main(int argc, char *argv[]) stop_all_traces(); } - show_stats(); + show_stats(device_information, ndevs, ncpus); return 0; } -- 2.25.1