xsk: support for Tx
authorMagnus Karlsson <magnus.karlsson@intel.com>
Wed, 2 May 2018 11:01:34 +0000 (13:01 +0200)
committerAlexei Starovoitov <ast@kernel.org>
Thu, 3 May 2018 22:55:25 +0000 (15:55 -0700)
Here, Tx support is added. The user fills the Tx queue with frames to
be sent by the kernel, and let's the kernel know using the sendmsg
syscall.

Signed-off-by: Magnus Karlsson <magnus.karlsson@intel.com>
Signed-off-by: Alexei Starovoitov <ast@kernel.org>
net/xdp/xsk.c
net/xdp/xsk_queue.h

index 2d7b0c90d99627e186ee196319cfb0823efe6b47..b33c535c7996e4341de4cfe56aa53c99ac0810aa 100644 (file)
@@ -36,6 +36,8 @@
 #include "xsk_queue.h"
 #include "xdp_umem.h"
 
+#define TX_BATCH_SIZE 16
+
 static struct xdp_sock *xdp_sk(struct sock *sk)
 {
        return (struct xdp_sock *)sk;
@@ -101,6 +103,108 @@ int xsk_generic_rcv(struct xdp_sock *xs, struct xdp_buff *xdp)
        return err;
 }
 
+static void xsk_destruct_skb(struct sk_buff *skb)
+{
+       u32 id = (u32)(long)skb_shinfo(skb)->destructor_arg;
+       struct xdp_sock *xs = xdp_sk(skb->sk);
+
+       WARN_ON_ONCE(xskq_produce_id(xs->umem->cq, id));
+
+       sock_wfree(skb);
+}
+
+static int xsk_generic_xmit(struct sock *sk, struct msghdr *m,
+                           size_t total_len)
+{
+       bool need_wait = !(m->msg_flags & MSG_DONTWAIT);
+       u32 max_batch = TX_BATCH_SIZE;
+       struct xdp_sock *xs = xdp_sk(sk);
+       bool sent_frame = false;
+       struct xdp_desc desc;
+       struct sk_buff *skb;
+       int err = 0;
+
+       if (unlikely(!xs->tx))
+               return -ENOBUFS;
+       if (need_wait)
+               return -EOPNOTSUPP;
+
+       mutex_lock(&xs->mutex);
+
+       while (xskq_peek_desc(xs->tx, &desc)) {
+               char *buffer;
+               u32 id, len;
+
+               if (max_batch-- == 0) {
+                       err = -EAGAIN;
+                       goto out;
+               }
+
+               if (xskq_reserve_id(xs->umem->cq)) {
+                       err = -EAGAIN;
+                       goto out;
+               }
+
+               len = desc.len;
+               if (unlikely(len > xs->dev->mtu)) {
+                       err = -EMSGSIZE;
+                       goto out;
+               }
+
+               skb = sock_alloc_send_skb(sk, len, !need_wait, &err);
+               if (unlikely(!skb)) {
+                       err = -EAGAIN;
+                       goto out;
+               }
+
+               skb_put(skb, len);
+               id = desc.idx;
+               buffer = xdp_umem_get_data(xs->umem, id) + desc.offset;
+               err = skb_store_bits(skb, 0, buffer, len);
+               if (unlikely(err)) {
+                       kfree_skb(skb);
+                       goto out;
+               }
+
+               skb->dev = xs->dev;
+               skb->priority = sk->sk_priority;
+               skb->mark = sk->sk_mark;
+               skb_shinfo(skb)->destructor_arg = (void *)(long)id;
+               skb->destructor = xsk_destruct_skb;
+
+               err = dev_direct_xmit(skb, xs->queue_id);
+               /* Ignore NET_XMIT_CN as packet might have been sent */
+               if (err == NET_XMIT_DROP || err == NETDEV_TX_BUSY) {
+                       err = -EAGAIN;
+                       /* SKB consumed by dev_direct_xmit() */
+                       goto out;
+               }
+
+               sent_frame = true;
+               xskq_discard_desc(xs->tx);
+       }
+
+out:
+       if (sent_frame)
+               sk->sk_write_space(sk);
+
+       mutex_unlock(&xs->mutex);
+       return err;
+}
+
+static int xsk_sendmsg(struct socket *sock, struct msghdr *m, size_t total_len)
+{
+       struct sock *sk = sock->sk;
+       struct xdp_sock *xs = xdp_sk(sk);
+
+       if (unlikely(!xs->dev))
+               return -ENXIO;
+       if (unlikely(!(xs->dev->flags & IFF_UP)))
+               return -ENETDOWN;
+
+       return xsk_generic_xmit(sk, m, total_len);
+}
+
 static unsigned int xsk_poll(struct file *file, struct socket *sock,
                             struct poll_table_struct *wait)
 {
@@ -110,6 +214,8 @@ static unsigned int xsk_poll(struct file *file, struct socket *sock,
 
        if (xs->rx && !xskq_empty_desc(xs->rx))
                mask |= POLLIN | POLLRDNORM;
+       if (xs->tx && !xskq_full_desc(xs->tx))
+               mask |= POLLOUT | POLLWRNORM;
 
        return mask;
 }
@@ -270,6 +376,7 @@ static int xsk_bind(struct socket *sock, struct sockaddr *addr, int addr_len)
        xs->queue_id = sxdp->sxdp_queue_id;
 
        xskq_set_umem(xs->rx, &xs->umem->props);
+       xskq_set_umem(xs->tx, &xs->umem->props);
 
 out_unlock:
        if (err)
@@ -383,8 +490,6 @@ static int xsk_mmap(struct file *file, struct socket *sock,
                        q = xs->umem->fq;
                else if (offset == XDP_UMEM_PGOFF_COMPLETION_RING)
                        q = xs->umem->cq;
-               else
-                       return -EINVAL;
        }
 
        if (!q)
@@ -420,7 +525,7 @@ static const struct proto_ops xsk_proto_ops = {
        .shutdown =     sock_no_shutdown,
        .setsockopt =   xsk_setsockopt,
        .getsockopt =   sock_no_getsockopt,
-       .sendmsg =      sock_no_sendmsg,
+       .sendmsg =      xsk_sendmsg,
        .recvmsg =      sock_no_recvmsg,
        .mmap =         xsk_mmap,
        .sendpage =     sock_no_sendpage,
index 0a9b92b4f93a689aba87d2d370a4cd8daef45679..3497e8808608f5d4e8d9072bcc24fcb7759d92bc 100644 (file)
@@ -111,7 +111,93 @@ static inline void xskq_discard_id(struct xsk_queue *q)
        (void)xskq_validate_id(q);
 }
 
-/* Rx queue */
+static inline int xskq_produce_id(struct xsk_queue *q, u32 id)
+{
+       struct xdp_umem_ring *ring = (struct xdp_umem_ring *)q->ring;
+
+       ring->desc[q->prod_tail++ & q->ring_mask] = id;
+
+       /* Order producer and data */
+       smp_wmb();
+
+       WRITE_ONCE(q->ring->producer, q->prod_tail);
+       return 0;
+}
+
+static inline int xskq_reserve_id(struct xsk_queue *q)
+{
+       if (xskq_nb_free(q, q->prod_head, 1) == 0)
+               return -ENOSPC;
+
+       q->prod_head++;
+       return 0;
+}
+
+/* Rx/Tx queue */
+
+static inline bool xskq_is_valid_desc(struct xsk_queue *q, struct xdp_desc *d)
+{
+       u32 buff_len;
+
+       if (unlikely(d->idx >= q->umem_props.nframes)) {
+               q->invalid_descs++;
+               return false;
+       }
+
+       buff_len = q->umem_props.frame_size;
+       if (unlikely(d->len > buff_len || d->len == 0 ||
+                    d->offset > buff_len || d->offset + d->len > buff_len)) {
+               q->invalid_descs++;
+               return false;
+       }
+
+       return true;
+}
+
+static inline struct xdp_desc *xskq_validate_desc(struct xsk_queue *q,
+                                                 struct xdp_desc *desc)
+{
+       while (q->cons_tail != q->cons_head) {
+               struct xdp_rxtx_ring *ring = (struct xdp_rxtx_ring *)q->ring;
+               unsigned int idx = q->cons_tail & q->ring_mask;
+
+               if (xskq_is_valid_desc(q, &ring->desc[idx])) {
+                       if (desc)
+                               *desc = ring->desc[idx];
+                       return desc;
+               }
+
+               q->cons_tail++;
+       }
+
+       return NULL;
+}
+
+static inline struct xdp_desc *xskq_peek_desc(struct xsk_queue *q,
+                                             struct xdp_desc *desc)
+{
+       struct xdp_rxtx_ring *ring;
+
+       if (q->cons_tail == q->cons_head) {
+               WRITE_ONCE(q->ring->consumer, q->cons_tail);
+               q->cons_head = q->cons_tail + xskq_nb_avail(q, RX_BATCH_SIZE);
+
+               /* Order consumer and data */
+               smp_rmb();
+
+               return xskq_validate_desc(q, desc);
+       }
+
+       ring = (struct xdp_rxtx_ring *)q->ring;
+       *desc = ring->desc[q->cons_tail & q->ring_mask];
+       return desc;
+}
+
+static inline void xskq_discard_desc(struct xsk_queue *q)
+{
+       q->cons_tail++;
+       (void)xskq_validate_desc(q, NULL);
+}
 
 static inline int xskq_produce_batch_desc(struct xsk_queue *q,
                                          u32 id, u32 len, u16 offset)
@@ -139,6 +225,11 @@ static inline void xskq_produce_flush_desc(struct xsk_queue *q)
        WRITE_ONCE(q->ring->producer, q->prod_tail);
 }
 
+static inline bool xskq_full_desc(struct xsk_queue *q)
+{
+       return (xskq_nb_avail(q, q->nentries) == q->nentries);
+}
+
 static inline bool xskq_empty_desc(struct xsk_queue *q)
 {
        return (xskq_nb_free(q, q->prod_tail, 1) == q->nentries);