char buts_name[32];
volatile int trace_started;
unsigned long drop_count;
- struct in_addr cl_in_addr;
struct thread_information *threads;
+ struct net_connection *nc;
};
static int ncpus;
static struct thread_information *thread_information;
static int ndevs;
static struct device_information *device_information;
-static int ndevs_running;
/* command line option globals */
static char *relay_path;
#define dip_tracing(dip) (*(volatile int *)(&(dip)->trace_started))
#define dip_set_tracing(dip, v) ((dip)->trace_started = (v))
-#define __for_each_dip(__d, __i, __e) \
- for (__i = 0, __d = device_information; __i < __e; __i++, __d++)
+#define __for_each_dip(__d, __i, __di, __e) \
+ for (__i = 0, __d = __di; __i < __e; __i++, __d++)
+
+#define for_each_dip(__d, __i) \
+ __for_each_dip(__d, __i, device_information, ndevs)
+#define for_each_nc_dip(__nc, __d, __i) \
+ __for_each_dip(__d, __i, (__nc)->device_information, (__nc)->ndevs)
-#define for_each_dip(__d, __i) __for_each_dip(__d, __i, ndevs)
#define for_each_tip(__d, __t, __j) \
for (__j = 0, __t = (__d)->threads; __j < ncpus; __j++, __t++)
static int net_mode = 0;
static int net_use_sendfile;
-static int net_in_fd = -1;
-static time_t net_connect_time;
+struct net_connection {
+ int in_fd;
+ time_t connect_time;
+ struct in_addr cl_in_addr;
+ struct device_information *device_information;
+ int ndevs;
+ int ncpus;
+ int connection_index;
+};
+
+#define NET_MAX_CONNECTIONS (1024)
+static struct net_connection net_connections[NET_MAX_CONNECTIONS];
+static int net_connects;
static int net_out_fd = -1;
static void handle_sigint(__attribute__((__unused__)) int sig)
static int read_data_net(struct thread_information *tip, void *buf,
unsigned int len)
{
+ struct net_connection *nc = tip->device->nc;
unsigned int bytes_left = len;
int ret = 0;
do {
- ret = recv(net_in_fd, buf, bytes_left, MSG_WAITALL);
+ ret = recv(nc->in_fd, buf, bytes_left, MSG_WAITALL);
if (!ret)
continue;
len = sprintf(dst, "%s/", output_dir);
if (net_mode == Net_server) {
- len += sprintf(dst + len, "%s-", inet_ntoa(dip->cl_in_addr));
- len += strftime(dst + len, 64, "%F-%T/", gmtime(&net_connect_time));
+ struct net_connection *nc = dip->nc;
+
+ len += sprintf(dst + len, "%s-", inet_ntoa(nc->cl_in_addr));
+ len += strftime(dst + len, 64, "%F-%T/", gmtime(&nc->connect_time));
}
if (stat(dst, &sb) < 0) {
}
if (i != ndevs) {
- __for_each_dip(dip, j, i)
+ __for_each_dip(dip, j, device_information, i)
stop_trace(dip);
return 1;
}
if (i != ndevs) {
- __for_each_dip(dip, j, i)
+ __for_each_dip(dip, j, device_information, i)
stop_threads(dip);
for_each_dip(dip, i)
stop_trace(dip);
return 0;
}
-static void show_stats(void)
+static void show_stats(struct device_information *dips, int ndips, int cpus)
{
struct device_information *dip;
struct thread_information *tip;
stat_shown = 1;
total_drops = 0;
- for_each_dip(dip, i) {
+ for (i = 0; i < ndips; i++) {
+ dip = &dips[i];
if (!no_stdout)
printf("Device: %s\n", dip->path);
events_processed = 0;
data_read = 0;
- for_each_tip(dip, tip, j) {
+ for (j = 0; j < cpus; j++) {
+ tip = &dip->threads[j];
if (!no_stdout)
printf(" CPU%3d: %20lu events, %8llu KiB data\n",
tip->cpu, tip->events_processed,
fprintf(stderr, "You have dropped events, consider using a larger buffer size (-b)\n");
}
-static struct device_information *net_get_dip(char *buts_name,
- struct in_addr *cl_in_addr)
+static struct device_information *net_get_dip(struct net_connection *nc,
+ char *buts_name)
{
struct device_information *dip;
int i;
- for (i = 0; i < ndevs; i++) {
- dip = &device_information[i];
+ for (i = 0; i < nc->ndevs; i++) {
+ dip = &nc->device_information[i];
if (!strcmp(dip->buts_name, buts_name))
return dip;
}
- device_information = realloc(device_information, (ndevs + 1) * sizeof(*dip));
- dip = &device_information[ndevs];
+ nc->device_information = realloc(device_information, (nc->ndevs + 1) * sizeof(*dip));
+ dip = &nc->device_information[nc->ndevs];
memset(dip, 0, sizeof(*dip));
dip->fd = -1;
- dip->cl_in_addr = *cl_in_addr;
+ dip->nc = nc;
strcpy(dip->buts_name, buts_name);
dip->path = strdup(buts_name);
dip->trace_started = 1;
- ndevs++;
- ndevs_running++;
- dip->threads = malloc(ncpus * sizeof(struct thread_information));
- memset(dip->threads, 0, ncpus * sizeof(struct thread_information));
+ nc->ndevs++;
+ dip->threads = malloc(nc->ncpus * sizeof(struct thread_information));
+ memset(dip->threads, 0, nc->ncpus * sizeof(struct thread_information));
/*
* open all files
*/
- for (i = 0; i < ncpus; i++) {
+ for (i = 0; i < nc->ncpus; i++) {
struct thread_information *tip = &dip->threads[i];
tip->cpu = i;
return dip;
}
-static struct thread_information *net_get_tip(struct blktrace_net_hdr *bnh,
- struct in_addr *cl_in_addr)
+static struct thread_information *net_get_tip(struct net_connection *nc,
+ struct blktrace_net_hdr *bnh)
{
struct device_information *dip;
- ncpus = bnh->max_cpus;
- dip = net_get_dip(bnh->buts_name, cl_in_addr);
+ nc->ncpus = bnh->max_cpus;
+ dip = net_get_dip(nc, bnh->buts_name);
if (!dip->trace_started) {
fprintf(stderr, "Events for closed devices %s\n", dip->buts_name);
return NULL;
return &dip->threads[bnh->cpu];
}
-static int net_get_header(struct blktrace_net_hdr *bnh)
+static int net_get_header(struct net_connection *nc,
+ struct blktrace_net_hdr *bnh)
{
- int fl = fcntl(net_in_fd, F_GETFL);
+ int fl = fcntl(nc->in_fd, F_GETFL);
int bytes_left, ret;
void *p = bnh;
- fcntl(net_in_fd, F_SETFL, fl | O_NONBLOCK);
+ fcntl(nc->in_fd, F_SETFL, fl | O_NONBLOCK);
bytes_left = sizeof(*bnh);
while (bytes_left && !is_done()) {
- ret = recv(net_in_fd, p, bytes_left, MSG_WAITALL);
+ ret = recv(nc->in_fd, p, bytes_left, MSG_WAITALL);
if (ret < 0) {
if (errno != EAGAIN) {
perror("recv header");
bytes_left -= ret;
}
}
- fcntl(net_in_fd, F_SETFL, fl & ~O_NONBLOCK);
+ fcntl(nc->in_fd, F_SETFL, fl & ~O_NONBLOCK);
return bytes_left;
}
-static int net_server_loop(struct in_addr *cl_in_addr)
+/*
+ * finalize a net client: truncate files, show stats, cleanup, etc
+ */
+static void net_client_done(struct net_connection *nc)
+{
+ struct device_information *dip;
+ struct thread_information *tip;
+ struct net_connection *last_nc;
+ int i, j;
+
+ for_each_nc_dip(nc, dip, i) {
+ for (j = 0; j < nc->ncpus; j++) {
+ tip = &dip->threads[j];
+
+ tip_ftrunc_final(tip);
+ }
+ }
+
+ show_stats(nc->device_information, nc->ndevs, nc->ncpus);
+
+ /*
+ * cleanup for next run
+ */
+ for_each_nc_dip(nc, dip, i) {
+ for (j = 0; j < nc->ncpus; j++) {
+ tip = &dip->threads[j];
+
+ fclose(tip->ofile);
+ }
+
+ free(dip->threads);
+ free(dip->path);
+ }
+
+ free(nc->device_information);
+ nc->device_information = NULL;
+ nc->ncpus = nc->ndevs = 0;
+
+ close(nc->in_fd);
+ nc->in_fd = -1;
+
+ net_connects--;
+
+ /*
+ * now put last entry where this one was, a little nasty since we
+ * need to adjust dip->nc as well
+ */
+ if (nc->connection_index != net_connects) {
+ last_nc = &net_connections[net_connects];
+ last_nc->connection_index = nc->connection_index;
+ *nc = *last_nc;
+ for_each_nc_dip(nc, dip, i)
+ dip->nc = nc;
+ }
+
+ stat_shown = 0;
+}
+
+/*
+ * handle incoming events from a net client
+ */
+static int net_client_data(struct net_connection *nc)
{
struct thread_information *tip;
struct blktrace_net_hdr bnh;
- if (net_get_header(&bnh))
+ if (net_get_header(nc, &bnh))
return 1;
if (data_is_native == -1 && check_data_endianness(bnh.magic)) {
*/
struct device_information *dip;
- dip = net_get_dip(bnh.buts_name, cl_in_addr);
+ dip = net_get_dip(nc, bnh.buts_name);
dip->drop_count = bnh.cpu;
dip->trace_started = 0;
- ndevs_running--;
- fprintf(stderr, "server: end of run for %s\n", dip->buts_name);
- return !ndevs_running;
+ printf("server: end of run for %s\n", dip->buts_name);
+ net_client_done(nc);
+ return 0;
}
- tip = net_get_tip(&bnh, cl_in_addr);
+ tip = net_get_tip(nc, &bnh);
if (!tip)
return 1;
return 0;
}
-static int get_connection(int fd, struct sockaddr_in *addr)
+static void net_add_connection(int listen_fd, struct sockaddr_in *addr)
{
- struct pollfd pfd = { .fd = fd, .events = POLLIN };
- socklen_t socklen;
-
- printf("blktrace: waiting for incoming connection...\n");
+ socklen_t socklen = sizeof(*addr);
+ struct net_connection *nc;
- if (poll(&pfd, 1, -1) < 0) {
- perror("poll for connection");
- return 1;
+ if (net_connects == NET_MAX_CONNECTIONS) {
+ fprintf(stderr, "server: no more connections allowed\n");
+ return;
}
- if ((pfd.revents & POLLIN) == 0)
- return 1;
- socklen = sizeof(*addr);
- net_in_fd = accept(fd, (struct sockaddr *) addr, &socklen);
- if (net_in_fd < 0) {
+ nc = &net_connections[net_connects];
+
+ nc->in_fd = accept(listen_fd, (struct sockaddr *) addr, &socklen);
+ if (nc->in_fd < 0) {
perror("accept");
- return 1;
+ return;
}
- time(&net_connect_time);
- printf("blktrace: connection from %s\n", inet_ntoa(addr->sin_addr));
- return 0;
+ printf("server: connection from %s\n", inet_ntoa(addr->sin_addr));
+ time(&nc->connect_time);
+ nc->connection_index = net_connects;
+ nc->cl_in_addr = addr->sin_addr;
+ net_connects++;
+}
+
+/*
+ * event driven loop, handle new incoming connections and data from
+ * existing connections
+ */
+static void net_server_handle_connections(int listen_fd,
+ struct sockaddr_in *addr)
+{
+ struct pollfd pfds[NET_MAX_CONNECTIONS + 1];
+ int i, events;
+
+ printf("server: waiting for connections...\n");
+
+ while (!is_done()) {
+ /*
+ * the zero entry is for incoming connections, remaining
+ * entries for clients
+ */
+ pfds[0].fd = listen_fd;
+ pfds[0].events = POLLIN;
+ for (i = 0; i < net_connects; i++) {
+ pfds[i + 1].fd = net_connections[i].in_fd;
+ pfds[i + 1].events = POLLIN;
+ }
+
+ events = poll(pfds, 1 + net_connects, -1);
+ if (events < 0) {
+ if (errno == EINTR)
+ continue;
+
+ perror("poll");
+ break;
+ } else if (!events)
+ continue;
+
+ if (pfds[0].revents & POLLIN) {
+ net_add_connection(listen_fd, addr);
+ events--;
+ }
+
+ for (i = 0; events && i < net_connects; i++) {
+ if (pfds[i + 1].revents & POLLIN) {
+ net_client_data(&net_connections[i]);
+ events--;
+ }
+ }
+ }
}
/*
*/
static int net_server(void)
{
- struct device_information *dip;
- struct thread_information *tip;
struct sockaddr_in addr;
- int fd, opt, i, j;
+ int fd, opt;
fd = socket(AF_INET, SOCK_STREAM, 0);
if (fd < 0) {
return 1;
}
-repeat:
- if (get_connection(fd, &addr))
- return 0;
-
- while (!is_done()) {
- if (net_server_loop(&addr.sin_addr))
- break;
- }
-
- for_each_dip(dip, i)
- for_each_tip(dip, tip, j)
- tip_ftrunc_final(tip);
-
- show_stats();
-
- if (is_done())
- return 0;
-
- /*
- * cleanup for next run
- */
- for_each_dip(dip, i) {
- for_each_tip(dip, tip, j)
- fclose(tip->ofile);
-
- free(dip->threads);
- free(dip->path);
- }
-
- free(device_information);
- device_information = NULL;
- ncpus = ndevs = 0;
-
- close(net_in_fd);
- net_in_fd = -1;
- stat_shown = 0;
- goto repeat;
+ net_server_handle_connections(fd, &addr);
+ return 0;
}
/*
stop_all_traces();
}
- show_stats();
+ show_stats(device_information, ndevs, ncpus);
return 0;
}