X-Git-Url: https://git.kernel.dk/?p=fio.git;a=blobdiff_plain;f=engines%2Fnet.c;h=37d44fd803d32ef578dd0a085bc3be7d9d48cd65;hp=add98dc015cc531fe2f793f0af6cdfde4e5fb417;hb=0cbb3f53ebeec45478c7b361c2a84092da93e4a8;hpb=49ccb8c10036ca0dca1238558c390f61f769c3c9 diff --git a/engines/net.c b/engines/net.c index add98dc0..37d44fd8 100644 --- a/engines/net.c +++ b/engines/net.c @@ -21,14 +21,19 @@ #include #include "../fio.h" +#include "../verify.h" +#include "../optgroup.h" struct netio_data { int listenfd; int use_splice; + int seq_off; int pipes[2]; struct sockaddr_in addr; struct sockaddr_in6 addr6; struct sockaddr_un addr_un; + uint64_t udp_send_seq; + uint64_t udp_recv_seq; }; struct netio_options { @@ -39,6 +44,8 @@ struct netio_options { unsigned int pingpong; unsigned int nodelay; unsigned int ttl; + unsigned int window_size; + unsigned int mss; char *intfc; }; @@ -47,10 +54,17 @@ struct udp_close_msg { uint32_t cmd; }; +struct udp_seq { + uint64_t magic; + uint64_t seq; + uint64_t bs; +}; + enum { FIO_LINK_CLOSE = 0x89, FIO_LINK_OPEN_CLOSE_MAGIC = 0x6c696e6b, FIO_LINK_OPEN = 0x98, + FIO_UDP_SEQ_MAGIC = 0x657375716e556563ULL, FIO_TYPE_TCP = 1, FIO_TYPE_UDP = 2, @@ -94,18 +108,22 @@ static struct fio_option options[] = { .oval = FIO_TYPE_TCP, .help = "Transmission Control Protocol", }, +#ifdef CONFIG_IPV6 { .ival = "tcpv6", .oval = FIO_TYPE_TCP_V6, .help = "Transmission Control Protocol V6", }, +#endif { .ival = "udp", .oval = FIO_TYPE_UDP, .help = "User Datagram Protocol", }, +#ifdef CONFIG_IPV6 { .ival = "udpv6", .oval = FIO_TYPE_UDP_V6, .help = "User Datagram Protocol V6", }, +#endif { .ival = "unix", .oval = FIO_TYPE_UNIX, .help = "UNIX domain socket", @@ -117,6 +135,7 @@ static struct fio_option options[] = { #ifdef CONFIG_TCP_NODELAY { .name = "nodelay", + .lname = "No Delay", .type = FIO_OPT_BOOL, .off1 = offsetof(struct netio_options, nodelay), .help = "Use TCP_NODELAY on TCP connections", @@ -135,6 +154,7 @@ static struct fio_option options[] = { }, { .name = "pingpong", + .lname = "Ping Pong", .type = FIO_OPT_STR_SET, .off1 = offsetof(struct netio_options, pingpong), .help = "Ping-pong IO requests", @@ -161,6 +181,30 @@ static struct fio_option options[] = { .category = FIO_OPT_C_ENGINE, .group = FIO_OPT_G_NETIO, }, +#ifdef CONFIG_NET_WINDOWSIZE + { + .name = "window_size", + .lname = "Window Size", + .type = FIO_OPT_INT, + .off1 = offsetof(struct netio_options, window_size), + .minval = 0, + .help = "Set socket buffer window size", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_NETIO, + }, +#endif +#ifdef CONFIG_NET_MSS + { + .name = "mss", + .lname = "Maximum segment size", + .type = FIO_OPT_INT, + .off1 = offsetof(struct netio_options, mss), + .minval = 0, + .help = "Set TCP maximum segment size", + .category = FIO_OPT_C_ENGINE, + .group = FIO_OPT_G_NETIO, + }, +#endif { .name = NULL, }, @@ -181,6 +225,65 @@ static inline int is_ipv6(struct netio_options *o) return o->proto == FIO_TYPE_UDP_V6 || o->proto == FIO_TYPE_TCP_V6; } +static int set_window_size(struct thread_data *td, int fd) +{ +#ifdef CONFIG_NET_WINDOWSIZE + struct netio_options *o = td->eo; + unsigned int wss; + int snd, rcv, ret; + + if (!o->window_size) + return 0; + + rcv = o->listen || o->pingpong; + snd = !o->listen || o->pingpong; + wss = o->window_size; + ret = 0; + + if (rcv) { + ret = setsockopt(fd, SOL_SOCKET, SO_RCVBUF, (void *) &wss, + sizeof(wss)); + if (ret < 0) + td_verror(td, errno, "rcvbuf window size"); + } + if (snd && !ret) { + ret = setsockopt(fd, SOL_SOCKET, SO_SNDBUF, (void *) &wss, + sizeof(wss)); + if (ret < 0) + td_verror(td, errno, "sndbuf window size"); + } + + return ret; +#else + td_verror(td, -EINVAL, "setsockopt window size"); + return -1; +#endif +} + +static int set_mss(struct thread_data *td, int fd) +{ +#ifdef CONFIG_NET_MSS + struct netio_options *o = td->eo; + unsigned int mss; + int ret; + + if (!o->mss || !is_tcp(o)) + return 0; + + mss = o->mss; + ret = setsockopt(fd, IPPROTO_TCP, TCP_MAXSEG, (void *) &mss, + sizeof(mss)); + if (ret < 0) + td_verror(td, errno, "setsockopt TCP_MAXSEG"); + + return ret; +#else + td_verror(td, -EINVAL, "setsockopt TCP_MAXSEG"); + return -1; +#endif +} + + /* * Return -1 for error and 'nr events' for a positive number * of events @@ -273,7 +376,7 @@ static int splice_io_u(int fdin, int fdout, unsigned int len) */ static int splice_in(struct thread_data *td, struct io_u *io_u) { - struct netio_data *nd = td->io_ops->data; + struct netio_data *nd = td->io_ops_data; return splice_io_u(io_u->file->fd, nd->pipes[1], io_u->xfer_buflen); } @@ -284,7 +387,7 @@ static int splice_in(struct thread_data *td, struct io_u *io_u) static int splice_out(struct thread_data *td, struct io_u *io_u, unsigned int len) { - struct netio_data *nd = td->io_ops->data; + struct netio_data *nd = td->io_ops_data; return splice_io_u(nd->pipes[0], io_u->file->fd, len); } @@ -322,7 +425,7 @@ static int vmsplice_io_u(struct io_u *io_u, int fd, unsigned int len) static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u, unsigned int len) { - struct netio_data *nd = td->io_ops->data; + struct netio_data *nd = td->io_ops_data; return vmsplice_io_u(io_u, nd->pipes[0], len); } @@ -332,7 +435,7 @@ static int vmsplice_io_u_out(struct thread_data *td, struct io_u *io_u, */ static int vmsplice_io_u_in(struct thread_data *td, struct io_u *io_u) { - struct netio_data *nd = td->io_ops->data; + struct netio_data *nd = td->io_ops_data; return vmsplice_io_u(io_u, nd->pipes[1], io_u->xfer_buflen); } @@ -380,15 +483,56 @@ static int fio_netio_splice_out(struct thread_data *td, struct io_u *io_u) } #endif +static void store_udp_seq(struct netio_data *nd, struct io_u *io_u) +{ + struct udp_seq *us; + + if (io_u->xfer_buflen < sizeof(*us)) + return; + + us = io_u->xfer_buf + io_u->xfer_buflen - sizeof(*us); + us->magic = cpu_to_le64((uint64_t) FIO_UDP_SEQ_MAGIC); + us->bs = cpu_to_le64((uint64_t) io_u->xfer_buflen); + us->seq = cpu_to_le64(nd->udp_send_seq++); +} + +static void verify_udp_seq(struct thread_data *td, struct netio_data *nd, + struct io_u *io_u) +{ + struct udp_seq *us; + uint64_t seq; + + if (io_u->xfer_buflen < sizeof(*us)) + return; + + if (nd->seq_off) + return; + + us = io_u->xfer_buf + io_u->xfer_buflen - sizeof(*us); + if (le64_to_cpu(us->magic) != FIO_UDP_SEQ_MAGIC) + return; + if (le64_to_cpu(us->bs) != io_u->xfer_buflen) { + nd->seq_off = 1; + return; + } + + seq = le64_to_cpu(us->seq); + + if (seq != nd->udp_recv_seq) + td->ts.drop_io_u[io_u->ddir] += seq - nd->udp_recv_seq; + + nd->udp_recv_seq = seq + 1; +} + static int fio_netio_send(struct thread_data *td, struct io_u *io_u) { - struct netio_data *nd = td->io_ops->data; + struct netio_data *nd = td->io_ops_data; struct netio_options *o = td->eo; int ret, flags = 0; do { if (is_udp(o)) { - struct sockaddr *to; + const struct sockaddr *to; socklen_t len; if (is_ipv6(o)) { @@ -399,6 +543,9 @@ static int fio_netio_send(struct thread_data *td, struct io_u *io_u) len = sizeof(nd->addr); } + if (td->o.verify == VERIFY_NONE) + store_udp_seq(nd, io_u); + ret = sendto(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen, flags, to, len); } else { @@ -424,7 +571,7 @@ static int fio_netio_send(struct thread_data *td, struct io_u *io_u) return ret; } -static int is_udp_close(struct io_u *io_u, int len) +static int is_close_msg(struct io_u *io_u, int len) { struct udp_close_msg *msg; @@ -432,9 +579,9 @@ static int is_udp_close(struct io_u *io_u, int len) return 0; msg = io_u->xfer_buf; - if (ntohl(msg->magic) != FIO_LINK_OPEN_CLOSE_MAGIC) + if (le32_to_cpu(msg->magic) != FIO_LINK_OPEN_CLOSE_MAGIC) return 0; - if (ntohl(msg->cmd) != FIO_LINK_CLOSE) + if (le32_to_cpu(msg->cmd) != FIO_LINK_CLOSE) return 0; return 1; @@ -442,7 +589,7 @@ static int is_udp_close(struct io_u *io_u, int len) static int fio_netio_recv(struct thread_data *td, struct io_u *io_u) { - struct netio_data *nd = td->io_ops->data; + struct netio_data *nd = td->io_ops_data; struct netio_options *o = td->eo; int ret, flags = 0; @@ -466,13 +613,19 @@ static int fio_netio_recv(struct thread_data *td, struct io_u *io_u) ret = recvfrom(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen, flags, from, len); - if (is_udp_close(io_u, ret)) { + + if (is_close_msg(io_u, ret)) { td->done = 1; return 0; } } else { ret = recv(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen, flags); + + if (is_close_msg(io_u, ret)) { + td->done = 1; + return 0; + } } if (ret > 0) break; @@ -485,13 +638,16 @@ static int fio_netio_recv(struct thread_data *td, struct io_u *io_u) flags |= MSG_WAITALL; } while (1); + if (is_udp(o) && td->o.verify == VERIFY_NONE) + verify_udp_seq(td, nd, io_u); + return ret; } static int __fio_netio_queue(struct thread_data *td, struct io_u *io_u, enum fio_ddir ddir) { - struct netio_data *nd = td->io_ops->data; + struct netio_data *nd = td->io_ops_data; struct netio_options *o = td->eo; int ret; @@ -511,11 +667,13 @@ static int __fio_netio_queue(struct thread_data *td, struct io_u *io_u, ret = 0; /* must be a SYNC */ if (ret != (int) io_u->xfer_buflen) { - if (ret >= 0) { + if (ret > 0) { io_u->resid = io_u->xfer_buflen - ret; io_u->error = 0; return FIO_Q_COMPLETED; - } else { + } else if (!ret) + return FIO_Q_BUSY; + else { int err = errno; if (ddir == DDIR_WRITE && err == EMSGSIZE) @@ -555,7 +713,7 @@ static int fio_netio_queue(struct thread_data *td, struct io_u *io_u) static int fio_netio_connect(struct thread_data *td, struct fio_file *f) { - struct netio_data *nd = td->io_ops->data; + struct netio_data *nd = td->io_ops_data; struct netio_options *o = td->eo; int type, domain; @@ -597,6 +755,15 @@ static int fio_netio_connect(struct thread_data *td, struct fio_file *f) } #endif + if (set_window_size(td, f->fd)) { + close(f->fd); + return 1; + } + if (set_mss(td, f->fd)) { + close(f->fd); + return 1; + } + if (is_udp(o)) { if (!fio_netio_is_multicast(td->o.filename)) return 0; @@ -661,7 +828,7 @@ static int fio_netio_connect(struct thread_data *td, struct fio_file *f) static int fio_netio_accept(struct thread_data *td, struct fio_file *f) { - struct netio_data *nd = td->io_ops->data; + struct netio_data *nd = td->io_ops_data; struct netio_options *o = td->eo; socklen_t socklen; int state; @@ -711,9 +878,9 @@ err: return 1; } -static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f) +static void fio_netio_send_close(struct thread_data *td, struct fio_file *f) { - struct netio_data *nd = td->io_ops->data; + struct netio_data *nd = td->io_ops_data; struct netio_options *o = td->eo; struct udp_close_msg msg; struct sockaddr *to; @@ -728,8 +895,8 @@ static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f) len = sizeof(nd->addr); } - msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC); - msg.cmd = htonl(FIO_LINK_CLOSE); + msg.magic = cpu_to_le32((uint32_t) FIO_LINK_OPEN_CLOSE_MAGIC); + msg.cmd = cpu_to_le32((uint32_t) FIO_LINK_CLOSE); ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, len); if (ret < 0) @@ -738,21 +905,17 @@ static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f) static int fio_netio_close_file(struct thread_data *td, struct fio_file *f) { - struct netio_options *o = td->eo; - /* - * If this is an UDP connection, notify the receiver that we are - * closing down the link + * Notify the receiver that we are closing down the link */ - if (is_udp(o)) - fio_netio_udp_close(td, f); + fio_netio_send_close(td, f); return generic_close_file(td, f); } static int fio_netio_udp_recv_open(struct thread_data *td, struct fio_file *f) { - struct netio_data *nd = td->io_ops->data; + struct netio_data *nd = td->io_ops_data; struct netio_options *o = td->eo; struct udp_close_msg msg; struct sockaddr *to; @@ -780,12 +943,13 @@ static int fio_netio_udp_recv_open(struct thread_data *td, struct fio_file *f) return -1; } + fio_gettime(&td->start, NULL); return 0; } -static int fio_netio_udp_send_open(struct thread_data *td, struct fio_file *f) +static int fio_netio_send_open(struct thread_data *td, struct fio_file *f) { - struct netio_data *nd = td->io_ops->data; + struct netio_data *nd = td->io_ops_data; struct netio_options *o = td->eo; struct udp_close_msg msg; struct sockaddr *to; @@ -829,7 +993,7 @@ static int fio_netio_open_file(struct thread_data *td, struct fio_file *f) if (is_udp(o)) { if (td_write(td)) - ret = fio_netio_udp_send_open(td, f); + ret = fio_netio_send_open(td, f); else { int state; @@ -846,11 +1010,52 @@ static int fio_netio_open_file(struct thread_data *td, struct fio_file *f) return ret; } +static int fio_fill_addr(struct thread_data *td, const char *host, int af, + void *dst, struct addrinfo **res) +{ + struct netio_options *o = td->eo; + struct addrinfo hints; + int ret; + + if (inet_pton(af, host, dst)) + return 0; + + memset(&hints, 0, sizeof(hints)); + + if (is_tcp(o)) + hints.ai_socktype = SOCK_STREAM; + else + hints.ai_socktype = SOCK_DGRAM; + + if (is_ipv6(o)) + hints.ai_family = AF_INET6; + else + hints.ai_family = AF_INET; + + ret = getaddrinfo(host, NULL, &hints, res); + if (ret) { + int e = EINVAL; + char str[128]; + + if (ret == EAI_SYSTEM) + e = errno; + + snprintf(str, sizeof(str), "getaddrinfo: %s", gai_strerror(ret)); + td_verror(td, e, str); + return 1; + } + + return 0; +} + static int fio_netio_setup_connect_inet(struct thread_data *td, const char *host, unsigned short port) { - struct netio_data *nd = td->io_ops->data; + struct netio_data *nd = td->io_ops_data; struct netio_options *o = td->eo; + struct addrinfo *res = NULL; + void *dst, *src; + int af, len; if (!host) { log_err("fio: connect with no host to connect to.\n"); @@ -861,55 +1066,47 @@ static int fio_netio_setup_connect_inet(struct thread_data *td, return 1; } - if (is_ipv6(o)) { - nd->addr6.sin6_family = AF_INET6; - nd->addr6.sin6_port = htons(port); - - if (!inet_pton(AF_INET6, host, &nd->addr6.sin6_addr)) { - struct addrinfo hints, *res; + nd->addr.sin_family = AF_INET; + nd->addr.sin_port = htons(port); + nd->addr6.sin6_family = AF_INET6; + nd->addr6.sin6_port = htons(port); - memset(&hints, 0, sizeof(hints)); - hints.ai_socktype = SOCK_STREAM; - - if (getaddrinfo(host, NULL, &hints, &res)) { - td_verror(td, errno, "gethostbyname"); - return 1; - } - - memcpy(&nd->addr6.sin6_addr, &((struct sockaddr_in6 *) res->ai_addr)->sin6_addr, sizeof(nd->addr6.sin6_addr)); - freeaddrinfo(res); - } + if (is_ipv6(o)) { + af = AF_INET6; + dst = &nd->addr6.sin6_addr; } else { - nd->addr.sin_family = AF_INET; - nd->addr.sin_port = htons(port); - - if (!inet_pton(AF_INET, host, &nd->addr.sin_addr)) { - struct addrinfo hints, *res; + af = AF_INET; + dst = &nd->addr.sin_addr; + } - memset(&hints, 0, sizeof(hints)); - hints.ai_socktype = SOCK_STREAM; + if (fio_fill_addr(td, host, af, dst, &res)) + return 1; - if (getaddrinfo(host, NULL, &hints, &res)) { - td_verror(td, errno, "gethostbyname"); - return 1; - } + if (!res) + return 0; - memcpy(&nd->addr.sin_addr, &((struct sockaddr_in *) res->ai_addr)->sin_addr, sizeof(nd->addr.sin_addr)); - freeaddrinfo(res); - } + if (is_ipv6(o)) { + len = sizeof(nd->addr6.sin6_addr); + src = &((struct sockaddr_in6 *) res->ai_addr)->sin6_addr; + } else { + len = sizeof(nd->addr.sin_addr); + src = &((struct sockaddr_in *) res->ai_addr)->sin_addr; } + memcpy(dst, src, len); + freeaddrinfo(res); return 0; } static int fio_netio_setup_connect_unix(struct thread_data *td, const char *path) { - struct netio_data *nd = td->io_ops->data; + struct netio_data *nd = td->io_ops_data; struct sockaddr_un *soun = &nd->addr_un; soun->sun_family = AF_UNIX; - strcpy(soun->sun_path, path); + memset(soun->sun_path, 0, sizeof(soun->sun_path)); + strncpy(soun->sun_path, path, sizeof(soun->sun_path) - 1); return 0; } @@ -925,7 +1122,7 @@ static int fio_netio_setup_connect(struct thread_data *td) static int fio_netio_setup_listen_unix(struct thread_data *td, const char *path) { - struct netio_data *nd = td->io_ops->data; + struct netio_data *nd = td->io_ops_data; struct sockaddr_un *addr = &nd->addr_un; mode_t mode; int len, fd; @@ -940,7 +1137,7 @@ static int fio_netio_setup_listen_unix(struct thread_data *td, const char *path) memset(addr, 0, sizeof(*addr)); addr->sun_family = AF_UNIX; - strcpy(addr->sun_path, path); + strncpy(addr->sun_path, path, sizeof(addr->sun_path) - 1); unlink(path); len = sizeof(addr->sun_family) + strlen(path) + 1; @@ -958,17 +1155,15 @@ static int fio_netio_setup_listen_unix(struct thread_data *td, const char *path) static int fio_netio_setup_listen_inet(struct thread_data *td, short port) { - struct netio_data *nd = td->io_ops->data; + struct netio_data *nd = td->io_ops_data; struct netio_options *o = td->eo; struct ip_mreq mr; struct sockaddr_in sin; - struct sockaddr_in6 sin6; struct sockaddr *saddr; int fd, opt, type, domain; socklen_t len; memset(&sin, 0, sizeof(sin)); - memset(&sin6, 0, sizeof(sin6)); if (o->proto == FIO_TYPE_TCP) { type = SOCK_STREAM; @@ -1007,12 +1202,26 @@ static int fio_netio_setup_listen_inet(struct thread_data *td, short port) } #endif - if (td->o.filename){ + if (set_window_size(td, fd)) { + close(fd); + return 1; + } + if (set_mss(td, fd)) { + close(fd); + return 1; + } + + if (td->o.filename) { if (!is_udp(o) || !fio_netio_is_multicast(td->o.filename)) { log_err("fio: hostname not valid for non-multicast inbound network IO\n"); close(fd); return 1; } + if (is_ipv6(o)) { + log_err("fio: IPv6 not supported for multicast network IO\n"); + close(fd); + return 1; + } inet_aton(td->o.filename, &sin.sin_addr); @@ -1026,6 +1235,7 @@ static int fio_netio_setup_listen_inet(struct thread_data *td, short port) } else { mr.imr_interface.s_addr = htonl(INADDR_ANY); } + if (setsockopt(fd, IPPROTO_IP, IP_ADD_MEMBERSHIP, (const char*)&mr, sizeof(mr)) < 0) { td_verror(td, errno, "setsockopt IP_ADD_MEMBERSHIP"); close(fd); @@ -1045,11 +1255,12 @@ static int fio_netio_setup_listen_inet(struct thread_data *td, short port) len = sizeof(nd->addr6); nd->addr6.sin6_family = AF_INET6; - nd->addr6.sin6_addr = sin6.sin6_addr.s6_addr ? sin6.sin6_addr : in6addr_any; + nd->addr6.sin6_addr = in6addr_any; nd->addr6.sin6_port = htons(port); } if (bind(fd, saddr, len) < 0) { + close(fd); td_verror(td, errno, "bind"); return 1; } @@ -1060,7 +1271,7 @@ static int fio_netio_setup_listen_inet(struct thread_data *td, short port) static int fio_netio_setup_listen(struct thread_data *td) { - struct netio_data *nd = td->io_ops->data; + struct netio_data *nd = td->io_ops_data; struct netio_options *o = td->eo; int ret; @@ -1106,6 +1317,8 @@ static int fio_netio_init(struct thread_data *td) return 1; } + o->port += td->subjob_number; + if (!is_tcp(o)) { if (o->listen) { log_err("fio: listen only valid for TCP proto IO\n"); @@ -1133,7 +1346,7 @@ static int fio_netio_init(struct thread_data *td) static void fio_netio_cleanup(struct thread_data *td) { - struct netio_data *nd = td->io_ops->data; + struct netio_data *nd = td->io_ops_data; if (nd) { if (nd->listenfd != -1) @@ -1152,17 +1365,18 @@ static int fio_netio_setup(struct thread_data *td) struct netio_data *nd; if (!td->files_index) { - add_file(td, td->o.filename ?: "net"); + add_file(td, td->o.filename ?: "net", 0, 0); td->o.nr_files = td->o.nr_files ?: 1; + td->o.open_files++; } - if (!td->io_ops->data) { - nd = malloc(sizeof(*nd));; + if (!td->io_ops_data) { + nd = malloc(sizeof(*nd)); memset(nd, 0, sizeof(*nd)); nd->listenfd = -1; nd->pipes[0] = nd->pipes[1] = -1; - td->io_ops->data = nd; + td->io_ops_data = nd; } return 0; @@ -1170,7 +1384,7 @@ static int fio_netio_setup(struct thread_data *td) static void fio_netio_terminate(struct thread_data *td) { - kill(td->pid, SIGUSR2); + kill(td->pid, SIGTERM); } #ifdef CONFIG_LINUX_SPLICE @@ -1180,7 +1394,7 @@ static int fio_netio_setup_splice(struct thread_data *td) fio_netio_setup(td); - nd = td->io_ops->data; + nd = td->io_ops_data; if (nd) { if (pipe(nd->pipes) < 0) return 1;