From: Tom Zanussi Date: Tue, 14 Mar 2006 08:02:07 +0000 (+0100) Subject: [PATCH] blktrace: per-cpu net connections X-Git-Tag: blktrace-0.99.2~55 X-Git-Url: https://git.kernel.dk/?a=commitdiff_plain;h=ff11d54c003b568a6ab94f4aa499f4ede0d549f1;hp=dd870ef6bac0cd5eb9ef1fe199e0670b006a940b;p=blktrace.git [PATCH] blktrace: per-cpu net connections We can get reliable poll() without POLLMSG tricks, if we open a net connection per CPU. --- diff --git a/blktrace.c b/blktrace.c index ca1caac..5c8377b 100644 --- a/blktrace.c +++ b/blktrace.c @@ -210,6 +210,8 @@ struct thread_information { unsigned long fs_off; void *fs_buf; unsigned long fs_buf_len; + + struct net_connection *nc; }; struct device_information { @@ -219,7 +221,10 @@ struct device_information { volatile int trace_started; unsigned long drop_count; struct thread_information *threads; - struct net_connection *nc; + + struct cl_host *ch; + u32 cl_id; + time_t cl_connect_time; }; static int ncpus; @@ -259,12 +264,14 @@ static void exit_trace(int status); #define for_each_dip(__d, __i) \ __for_each_dip(__d, device_information, ndevs, __i) #define for_each_nc_dip(__nc, __d, __i) \ - __for_each_dip(__d, (__nc)->device_information, (__nc)->ndevs, __i) + __for_each_dip(__d, (__nc)->ch->device_information, (__nc)->ch->ndevs, __i) #define __for_each_tip(__d, __t, __ncpus, __j) \ for (__j = 0, __t = (__d)->threads; __j < __ncpus; __j++, __t++) #define for_each_tip(__d, __t, __j) \ __for_each_tip(__d, __t, ncpus, __j) +#define for_each_cl_host(__c) \ + for (__c = cl_host_list; __c; __c = __c->list_next) /* * networking stuff follows. we include a magic number so we know whether @@ -276,6 +283,7 @@ struct blktrace_net_hdr { u32 cpu; /* for which cpu */ u32 max_cpus; u32 len; /* length of following trace data */ + u32 cl_id; /* id for set of client per-cpu connections */ }; #define TRACE_NET_PORT (8462) @@ -294,20 +302,31 @@ static int net_port = TRACE_NET_PORT; static int net_mode = 0; static int net_use_sendfile = 1; -struct net_connection { - int in_fd; - time_t connect_time; +struct cl_host { + struct cl_host *list_next; struct in_addr cl_in_addr; + struct net_connection *net_connections; + int nconn; struct device_information *device_information; int ndevs; int ncpus; - int connection_index; + int ndevs_done; }; -#define NET_MAX_CONNECTIONS (1024) -static struct net_connection net_connections[NET_MAX_CONNECTIONS]; +struct net_connection { + int in_fd; + struct pollfd pfd; + time_t connect_time; + struct cl_host *ch; + int ncpus; +}; + +#define NET_MAX_CL_HOSTS (1024) +static struct cl_host *cl_host_list; +static int cl_hosts; static int net_connects; -static int net_out_fd = -1; + +static int *net_out_fd; static void handle_sigint(__attribute__((__unused__)) int sig) { @@ -411,20 +430,20 @@ static void stop_all_traces(void) } } -static void wait_for_data(struct thread_information *tip, int events) +static void wait_for_data(struct thread_information *tip, int timeout) { - struct pollfd pfd = { .fd = tip->fd, .events = events }; + struct pollfd pfd = { .fd = tip->fd, .events = POLLIN }; - do { - if (poll(&pfd, 1, 100) < 0) { + while (!is_done()) { + if (poll(&pfd, 1, timeout) < 0) { perror("poll"); break; } - if (pfd.revents & events) + if (pfd.revents & POLLIN) break; if (tip->ofile_stdout) break; - } while (!is_done()); + } } static int read_data_file(struct thread_information *tip, void *buf, @@ -433,7 +452,7 @@ static int read_data_file(struct thread_information *tip, void *buf, int ret = 0; do { - wait_for_data(tip, POLLIN); + wait_for_data(tip, 100); ret = read(tip->fd, buf, len); if (!ret) @@ -458,7 +477,7 @@ static int read_data_file(struct thread_information *tip, void *buf, static int read_data_net(struct thread_information *tip, void *buf, unsigned int len) { - struct net_connection *nc = tip->device->nc; + struct net_connection *nc = tip->nc; unsigned int bytes_left = len; int ret = 0; @@ -586,43 +605,6 @@ static int get_subbuf(struct thread_information *tip, unsigned int maxlen) return ret; } -static int get_subbuf_sendfile(struct thread_information *tip, - unsigned int maxlen) -{ - struct tip_subbuf *ts; - struct stat sb; - unsigned int ready; - - wait_for_data(tip, POLLMSG); - - /* - * hack to get last data out, we can't use sendfile for that - */ - if (is_done()) - return get_subbuf(tip, maxlen); - - if (fstat(tip->fd, &sb) < 0) { - perror("trace stat"); - return -1; - } - ready = sb.st_size - tip->data_queued; - if (!ready) { - usleep(1000); - return 0; - } - - ts = malloc(sizeof(*ts)); - ts->buf = NULL; - ts->max_len = 0; - ts->len = ready; - tip->data_queued += ready; - - if (subbuf_fifo_queue(tip, ts)) - return -1; - - return ready; -} - static void close_thread(struct thread_information *tip) { if (tip->fd != -1) @@ -723,8 +705,9 @@ static int net_send_header(struct thread_information *tip, unsigned int len) hdr.cpu = tip->cpu; hdr.max_cpus = ncpus; hdr.len = len; + hdr.cl_id = getpid(); - return write_data_net(net_out_fd, &hdr, sizeof(hdr)); + return write_data_net(net_out_fd[tip->cpu], &hdr, sizeof(hdr)); } /* @@ -742,8 +725,9 @@ static void net_client_send_close(void) hdr.len = 0; strcpy(hdr.buts_name, dip->buts_name); hdr.cpu = get_dropped_count(dip->buts_name); + hdr.cl_id = getpid(); - write_data_net(net_out_fd, &hdr, sizeof(hdr)); + write_data_net(net_out_fd[0], &hdr, sizeof(hdr)); } } @@ -753,7 +737,7 @@ static int flush_subbuf_net(struct thread_information *tip, { if (net_send_header(tip, ts->len)) return -1; - if (write_data_net(net_out_fd, ts->buf, ts->len)) + if (write_data_net(net_out_fd[tip->cpu], ts->buf, ts->len)) return -1; free(ts->buf); @@ -763,7 +747,7 @@ static int flush_subbuf_net(struct thread_information *tip, static int net_sendfile(struct thread_information *tip, struct tip_subbuf *ts) { - int ret = sendfile(net_out_fd, tip->fd, NULL, ts->len); + int ret = sendfile(net_out_fd[tip->cpu], tip->fd, NULL, ts->len); if (ret < 0) { perror("sendfile"); @@ -781,14 +765,6 @@ static int flush_subbuf_sendfile(struct thread_information *tip, { int ret = -1; - /* - * currently we cannot use sendfile() on the last bytes read, as they - * may not be a full subbuffer. get_subbuf_sendfile() falls back to - * the read approach for those, so use send() to ship them out - */ - if (ts->buf) - return flush_subbuf_net(tip, ts); - if (net_send_header(tip, ts->len)) goto err; if (net_sendfile(tip, ts)) @@ -802,6 +778,37 @@ err: return ret; } +static int get_subbuf_sendfile(struct thread_information *tip, + unsigned int maxlen) +{ + struct tip_subbuf *ts; + struct stat sb; + unsigned int ready; + (void) maxlen; + + wait_for_data(tip, -1); + + if (fstat(tip->fd, &sb) < 0) { + perror("trace stat"); + return -1; + } + ready = sb.st_size - tip->data_queued; + if (!ready) { + usleep(1000); + return 0; + } + + ts = malloc(sizeof(*ts)); + ts->buf = NULL; + ts->max_len = 0; + ts->len = ready; + tip->data_queued += ready; + + flush_subbuf_sendfile(tip, ts); + + return ready; +} + static int write_data(struct thread_information *tip, void *buf, unsigned int buf_len) { @@ -951,7 +958,7 @@ static void wait_for_threads(void) * for files, we just wait around for trace threads to exit */ if ((output_name && !strcmp(output_name, "-")) || - net_mode == Net_client) + ((net_mode == Net_client) && !net_use_sendfile)) get_and_write_events(); else { struct device_information *dip; @@ -985,10 +992,10 @@ static int fill_ofname(struct device_information *dip, len = sprintf(dst, "./"); if (net_mode == Net_server) { - struct net_connection *nc = dip->nc; + struct net_connection *nc = tip->nc; - len += sprintf(dst + len, "%s-", inet_ntoa(nc->cl_in_addr)); - len += strftime(dst + len, 64, "%F-%T/", gmtime(&nc->connect_time)); + len += sprintf(dst + len, "%s-", inet_ntoa(nc->ch->cl_in_addr)); + len += strftime(dst + len, 64, "%F-%T/", gmtime(&dip->cl_connect_time)); } if (stat(dst, &sb) < 0) { @@ -1019,7 +1026,7 @@ static void fill_ops(struct thread_information *tip) if (net_mode == Net_client) { if (net_use_sendfile) { tip->get_subbuf = get_subbuf_sendfile; - tip->flush_subbuf = flush_subbuf_sendfile; + tip->flush_subbuf = NULL; } else { tip->get_subbuf = get_subbuf; tip->flush_subbuf = flush_subbuf_net; @@ -1271,27 +1278,36 @@ static void show_stats(struct device_information *dips, int ndips, int cpus) } static struct device_information *net_get_dip(struct net_connection *nc, - char *buts_name) + char *buts_name, u32 cl_id) { - struct device_information *dip; + struct device_information *dip, *cl_dip = NULL; + struct cl_host *ch = nc->ch; int i; - for (i = 0; i < nc->ndevs; i++) { - dip = &nc->device_information[i]; + for (i = 0; i < ch->ndevs; i++) { + dip = &ch->device_information[i]; if (!strcmp(dip->buts_name, buts_name)) return dip; + + if (dip->cl_id == cl_id) + cl_dip = dip; } - nc->device_information = realloc(nc->device_information, (nc->ndevs + 1) * sizeof(*dip)); - dip = &nc->device_information[nc->ndevs]; + ch->device_information = realloc(ch->device_information, (ch->ndevs + 1) * sizeof(*dip)); + dip = &ch->device_information[ch->ndevs]; memset(dip, 0, sizeof(*dip)); dip->fd = -1; - dip->nc = nc; + dip->ch = ch; + dip->cl_id = cl_id; + if (cl_dip) + dip->cl_connect_time = cl_dip->cl_connect_time; + else + dip->cl_connect_time = nc->connect_time; strcpy(dip->buts_name, buts_name); dip->path = strdup(buts_name); dip->trace_started = 1; - nc->ndevs++; + ch->ndevs++; dip->threads = malloc(nc->ncpus * sizeof(struct thread_information)); memset(dip->threads, 0, nc->ncpus * sizeof(struct thread_information)); @@ -1304,9 +1320,12 @@ static struct device_information *net_get_dip(struct net_connection *nc, tip->cpu = i; tip->device = dip; tip->fd = -1; - + tip->nc = nc; + if (tip_open_output(dip, tip)) return NULL; + + tip->nc = NULL; } return dip; @@ -1316,15 +1335,20 @@ static struct thread_information *net_get_tip(struct net_connection *nc, struct blktrace_net_hdr *bnh) { struct device_information *dip; + struct thread_information *tip; nc->ncpus = bnh->max_cpus; - dip = net_get_dip(nc, bnh->buts_name); + dip = net_get_dip(nc, bnh->buts_name, bnh->cl_id); if (!dip->trace_started) { fprintf(stderr, "Events for closed devices %s\n", dip->buts_name); return NULL; } - return &dip->threads[bnh->cpu]; + tip = &dip->threads[bnh->cpu]; + if (!tip->nc) + tip->nc = nc; + + return tip; } static int net_get_header(struct net_connection *nc, @@ -1360,53 +1384,85 @@ static int net_get_header(struct net_connection *nc, /* * finalize a net client: truncate files, show stats, cleanup, etc */ -static void net_client_done(struct net_connection *nc) +static void device_done(struct net_connection *nc, struct device_information *dip) { - struct device_information *dip; struct thread_information *tip; - struct net_connection *last_nc; - int i, j; + int i; - for_each_nc_dip(nc, dip, i) - __for_each_tip(dip, tip, nc->ncpus, j) - tip_ftrunc_final(tip); + __for_each_tip(dip, tip, nc->ncpus, i) + tip_ftrunc_final(tip); - show_stats(nc->device_information, nc->ndevs, nc->ncpus); + show_stats(dip, 1, nc->ncpus); /* * cleanup for next run */ - for_each_nc_dip(nc, dip, i) { - __for_each_tip(dip, tip, nc->ncpus, j) { - if (tip->ofile) - fclose(tip->ofile); - } - - free(dip->threads); - free(dip->path); + __for_each_tip(dip, tip, nc->ncpus, i) { + if (tip->ofile) + fclose(tip->ofile); } - free(nc->device_information); - nc->device_information = NULL; - nc->ncpus = nc->ndevs = 0; + free(dip->threads); + free(dip->path); close(nc->in_fd); nc->in_fd = -1; - net_connects--; + stat_shown = 0; +} - /* - * now put last entry where this one was, a little nasty since we - * need to adjust dip->nc as well - */ - if (nc->connection_index != net_connects) { - last_nc = &net_connections[net_connects]; - *nc = *last_nc; - for_each_nc_dip(nc, dip, i) - dip->nc = nc; +static inline int in_addr_eq(struct in_addr a, struct in_addr b) +{ + return a.s_addr == b.s_addr; +} + +static void net_add_client_host(struct cl_host *ch) +{ + ch->list_next = cl_host_list; + cl_host_list = ch; + cl_hosts++; +} + +static void net_remove_client_host(struct cl_host *ch) +{ + struct cl_host *p, *c; + + for (p = c = cl_host_list; c; c = c->list_next) { + if (c == ch) { + if (p == c) + cl_host_list = c->list_next; + else + p->list_next = c->list_next; + cl_hosts--; + return; + } + p = c; } +} - stat_shown = 0; +static struct cl_host *net_find_client_host(struct in_addr cl_in_addr) +{ + struct cl_host *ch = cl_host_list; + + while (ch) { + if (in_addr_eq(ch->cl_in_addr, cl_in_addr)) + return ch; + ch = ch->list_next; + } + + return NULL; +} + +/* + * finalize a net client: truncate files, show stats, cleanup, etc + */ +static void net_client_host_done(struct cl_host *ch) +{ + free(ch->device_information); + free(ch->net_connections); + net_connects -= ch->nconn; + net_remove_client_host(ch); + free(ch); } /* @@ -1429,6 +1485,7 @@ static int net_client_data(struct net_connection *nc) bnh.magic = be32_to_cpu(bnh.magic); bnh.cpu = be32_to_cpu(bnh.cpu); bnh.len = be32_to_cpu(bnh.len); + bnh.cl_id = be32_to_cpu(bnh.cl_id); } if ((bnh.magic & 0xffffff00) != BLK_IO_TRACE_MAGIC) { @@ -1445,12 +1502,15 @@ static int net_client_data(struct net_connection *nc) */ struct device_information *dip; - dip = net_get_dip(nc, bnh.buts_name); + dip = net_get_dip(nc, bnh.buts_name, bnh.cl_id); dip->drop_count = bnh.cpu; dip->trace_started = 0; printf("server: end of run for %s\n", dip->buts_name); - net_client_done(nc); + device_done(nc, dip); + + if (++nc->ch->ndevs_done == nc->ch->ndevs) + net_client_host_done(nc->ch); return 0; } @@ -1468,25 +1528,35 @@ static void net_add_connection(int listen_fd, struct sockaddr_in *addr) { socklen_t socklen = sizeof(*addr); struct net_connection *nc; + struct cl_host *ch; + int in_fd; - if (net_connects == NET_MAX_CONNECTIONS) { - fprintf(stderr, "server: no more connections allowed\n"); + in_fd = accept(listen_fd, (struct sockaddr *) addr, &socklen); + if (in_fd < 0) { + perror("accept"); return; } - nc = &net_connections[net_connects]; - memset(nc, 0, sizeof(*nc)); - - nc->in_fd = accept(listen_fd, (struct sockaddr *) addr, &socklen); - if (nc->in_fd < 0) { - perror("accept"); - return; + ch = net_find_client_host(addr->sin_addr); + if (!ch) { + if (cl_hosts == NET_MAX_CL_HOSTS) { + fprintf(stderr, "server: no more clients allowed\n"); + return; + } + ch = malloc(sizeof(struct cl_host)); + memset(ch, 0, sizeof(*ch)); + ch->cl_in_addr = addr->sin_addr; + net_add_client_host(ch); } + ch->net_connections = realloc(ch->net_connections, (ch->nconn + 1) * sizeof(*nc)); + nc = &ch->net_connections[ch->nconn++]; + memset(nc, 0, sizeof(*nc)); + printf("server: connection from %s\n", inet_ntoa(addr->sin_addr)); time(&nc->connect_time); - nc->connection_index = net_connects; - nc->cl_in_addr = addr->sin_addr; + nc->ch = ch; + nc->in_fd = in_fd; net_connects++; } @@ -1497,24 +1567,38 @@ static void net_add_connection(int listen_fd, struct sockaddr_in *addr) static void net_server_handle_connections(int listen_fd, struct sockaddr_in *addr) { - struct pollfd pfds[NET_MAX_CONNECTIONS + 1]; - int i, events; - + struct pollfd *pfds = NULL; + struct net_connection **ncs = NULL; + int max_connects = 0; + int i, nconns, events; + struct cl_host *ch; + struct net_connection *nc; + printf("server: waiting for connections...\n"); while (!is_done()) { + if (net_connects >= max_connects) { + pfds = realloc(pfds, (net_connects + 1) * sizeof(*pfds)); + ncs = realloc(ncs, (net_connects + 1) * sizeof(*ncs)); + max_connects = net_connects + 1; + } /* * the zero entry is for incoming connections, remaining * entries for clients */ pfds[0].fd = listen_fd; pfds[0].events = POLLIN; - for (i = 0; i < net_connects; i++) { - pfds[i + 1].fd = net_connections[i].in_fd; - pfds[i + 1].events = POLLIN; + nconns = 0; + for_each_cl_host(ch) { + for (i = 0; i < ch->nconn; i++) { + nc = &ch->net_connections[i]; + pfds[nconns + 1].fd = nc->in_fd; + pfds[nconns + 1].events = POLLIN; + ncs[nconns++] = nc; + } } - events = poll(pfds, 1 + net_connects, -1); + events = poll(pfds, 1 + nconns, -1); if (events < 0) { if (errno == EINTR) continue; @@ -1529,9 +1613,9 @@ static void net_server_handle_connections(int listen_fd, events--; } - for (i = 0; events && i < net_connects; i++) { + for (i = 0; events && i < nconns; i++) { if (pfds[i + 1].revents & POLLIN) { - net_client_data(&net_connections[i]); + net_client_data(ncs[i]); events--; } } @@ -1581,9 +1665,8 @@ static int net_server(void) /* * Setup outgoing network connection where we will transmit data */ -static int net_setup_client(void) +static int net_setup_client_cpu(int i, struct sockaddr_in *addr) { - struct sockaddr_in addr; int fd; fd = socket(AF_INET, SOCK_STREAM, 0); @@ -1592,6 +1675,20 @@ static int net_setup_client(void) return 1; } + if (connect(fd, (struct sockaddr *) addr, sizeof(*addr)) < 0) { + perror("client: connect"); + return 1; + } + + net_out_fd[i] = fd; + return 0; +} + +static int net_setup_client(void) +{ + struct sockaddr_in addr; + int i; + memset(&addr, 0, sizeof(addr)); addr.sin_family = AF_INET; addr.sin_port = htons(net_port); @@ -1609,13 +1706,14 @@ static int net_setup_client(void) printf("blktrace: connecting to %s\n", hostname); - if (connect(fd, (struct sockaddr *) &addr, sizeof(addr)) < 0) { - perror("client: connect"); - return 1; + net_out_fd = malloc(ncpus * sizeof(*net_out_fd)); + for (i = 0; i < ncpus; i++) { + if (net_setup_client_cpu(i, &addr)) + return 1; } printf("blktrace: connected!\n"); - net_out_fd = fd; + return 0; } @@ -1635,7 +1733,7 @@ static char usage_str[] = \ "\t-l Run in network listen mode (blktrace server)\n" \ "\t-h Run in network client mode, connecting to the given host\n" \ "\t-p Network port to use (default 8462)\n" \ - "\t-s Make the network client use sendfile() to transfer data\n" \ + "\t-s Make the network client NOT use sendfile() to transfer data\n" \ "\t-V Print program version info\n\n"; static void show_usage(char *program)