X-Git-Url: https://git.kernel.dk/?a=blobdiff_plain;f=engines%2Fnet.c;h=7a0fe696c1b81c7ff0dba9539387a0f16c5d7bdb;hb=d8b64af2a1688c3c3d218cba0faad57a3ba050e8;hp=30f66470185e97b4c224f344d0a4cc118add98ff;hpb=3bcb9d945bf8859cfcca50a2899b1af6ae849c02;p=fio.git diff --git a/engines/net.c b/engines/net.c index 30f66470..7a0fe696 100644 --- a/engines/net.c +++ b/engines/net.c @@ -26,6 +26,7 @@ struct netio_data { int listenfd; int use_splice; + int seq_off; int pipes[2]; struct sockaddr_in addr; struct sockaddr_in6 addr6; @@ -55,6 +56,7 @@ struct udp_close_msg { struct udp_seq { uint64_t magic; uint64_t seq; + uint64_t bs; }; enum { @@ -483,7 +485,8 @@ static void store_udp_seq(struct netio_data *nd, struct io_u *io_u) struct udp_seq *us; us = io_u->xfer_buf + io_u->xfer_buflen - sizeof(*us); - us->magic = cpu_to_le64(FIO_UDP_SEQ_MAGIC); + us->magic = cpu_to_le64((uint64_t) FIO_UDP_SEQ_MAGIC); + us->bs = cpu_to_le64((uint64_t) io_u->xfer_buflen); us->seq = cpu_to_le64(nd->udp_send_seq++); } @@ -493,9 +496,16 @@ static void verify_udp_seq(struct thread_data *td, struct netio_data *nd, struct udp_seq *us; uint64_t seq; + if (nd->seq_off) + return; + us = io_u->xfer_buf + io_u->xfer_buflen - sizeof(*us); if (le64_to_cpu(us->magic) != FIO_UDP_SEQ_MAGIC) return; + if (le64_to_cpu(us->bs) != io_u->xfer_buflen) { + nd->seq_off = 1; + return; + } seq = le64_to_cpu(us->seq); @@ -552,7 +562,7 @@ static int fio_netio_send(struct thread_data *td, struct io_u *io_u) return ret; } -static int is_udp_close(struct io_u *io_u, int len) +static int is_close_msg(struct io_u *io_u, int len) { struct udp_close_msg *msg; @@ -560,9 +570,9 @@ static int is_udp_close(struct io_u *io_u, int len) return 0; msg = io_u->xfer_buf; - if (ntohl(msg->magic) != FIO_LINK_OPEN_CLOSE_MAGIC) + if (le32_to_cpu(msg->magic) != FIO_LINK_OPEN_CLOSE_MAGIC) return 0; - if (ntohl(msg->cmd) != FIO_LINK_CLOSE) + if (le32_to_cpu(msg->cmd) != FIO_LINK_CLOSE) return 0; return 1; @@ -595,13 +605,18 @@ static int fio_netio_recv(struct thread_data *td, struct io_u *io_u) ret = recvfrom(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen, flags, from, len); - if (is_udp_close(io_u, ret)) { + if (is_close_msg(io_u, ret)) { td->done = 1; return 0; } } else { ret = recv(io_u->file->fd, io_u->xfer_buf, io_u->xfer_buflen, flags); + + if (is_close_msg(io_u, ret)) { + td->done = 1; + return 0; + } } if (ret > 0) break; @@ -854,7 +869,7 @@ err: return 1; } -static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f) +static void fio_netio_send_close(struct thread_data *td, struct fio_file *f) { struct netio_data *nd = td->io_ops->data; struct netio_options *o = td->eo; @@ -871,8 +886,8 @@ static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f) len = sizeof(nd->addr); } - msg.magic = htonl(FIO_LINK_OPEN_CLOSE_MAGIC); - msg.cmd = htonl(FIO_LINK_CLOSE); + msg.magic = cpu_to_le32((uint32_t) FIO_LINK_OPEN_CLOSE_MAGIC); + msg.cmd = cpu_to_le32((uint32_t) FIO_LINK_CLOSE); ret = sendto(f->fd, (void *) &msg, sizeof(msg), MSG_WAITALL, to, len); if (ret < 0) @@ -881,14 +896,10 @@ static void fio_netio_udp_close(struct thread_data *td, struct fio_file *f) static int fio_netio_close_file(struct thread_data *td, struct fio_file *f) { - struct netio_options *o = td->eo; - /* - * If this is an UDP connection, notify the receiver that we are - * closing down the link + * Notify the receiver that we are closing down the link */ - if (is_udp(o)) - fio_netio_udp_close(td, f); + fio_netio_send_close(td, f); return generic_close_file(td, f); } @@ -927,7 +938,7 @@ static int fio_netio_udp_recv_open(struct thread_data *td, struct fio_file *f) return 0; } -static int fio_netio_udp_send_open(struct thread_data *td, struct fio_file *f) +static int fio_netio_send_open(struct thread_data *td, struct fio_file *f) { struct netio_data *nd = td->io_ops->data; struct netio_options *o = td->eo; @@ -973,7 +984,7 @@ static int fio_netio_open_file(struct thread_data *td, struct fio_file *f) if (is_udp(o)) { if (td_write(td)) - ret = fio_netio_udp_send_open(td, f); + ret = fio_netio_send_open(td, f); else { int state; @@ -1297,6 +1308,8 @@ static int fio_netio_init(struct thread_data *td) return 1; } + o->port += td->subjob_number; + if (!is_tcp(o)) { if (o->listen) { log_err("fio: listen only valid for TCP proto IO\n");