tipc: add new functions for multicast and broadcast distribution
authorJon Paul Maloy <jon.maloy@ericsson.com>
Thu, 17 Jul 2014 00:41:00 +0000 (20:41 -0400)
committerDavid S. Miller <davem@davemloft.net>
Thu, 17 Jul 2014 04:38:18 +0000 (21:38 -0700)
We add a new broadcast link transmit function in bclink.c and a new
receive function in socket.c. The purpose is to move the branching
between external and internal destination down to the link layer,
just as we have done with unicast in earlier commits. We also make
use of the new link-independent fragmentation support that was
introduced in an earlier commit series.

This gives a shorter and simpler code path, and makes it possible
to obtain copy-free buffer delivery to all node local destination
sockets.

The new transmission code is added in parallel with the existing one,
and will be used by the socket multicast send function in the next
commit in this series.

Signed-off-by: Jon Maloy <jon.maloy@ericsson.com>
Reviewed-by: Erik Hugne <erik.hugne@ericsson.com>
Reviewed-by: Ying Xue <ying.xue@windriver.com>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/tipc/bcast.c
net/tipc/bcast.h
net/tipc/msg.c
net/tipc/msg.h
net/tipc/socket.c
net/tipc/socket.h

index 55c6c9d3e1ceee905bd25ce09d4c8bd7c41a3412..ac947251dd37605a5fe7117baf2c6ccdf1cbbcbe 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * net/tipc/bcast.c: TIPC broadcast code
  *
- * Copyright (c) 2004-2006, Ericsson AB
+ * Copyright (c) 2004-2006, 2014, Ericsson AB
  * Copyright (c) 2004, Intel Corporation.
  * Copyright (c) 2005, 2010-2011, Wind River Systems
  * All rights reserved.
@@ -38,6 +38,8 @@
 #include "core.h"
 #include "link.h"
 #include "port.h"
+#include "socket.h"
+#include "msg.h"
 #include "bcast.h"
 #include "name_distr.h"
 
@@ -138,6 +140,11 @@ static void tipc_bclink_unlock(void)
                tipc_link_reset_all(node);
 }
 
+uint  tipc_bclink_get_mtu(void)
+{
+       return MAX_PKT_DEFAULT_MCAST;
+}
+
 void tipc_bclink_set_flags(unsigned int flags)
 {
        bclink->flags |= flags;
@@ -408,6 +415,52 @@ exit:
        return res;
 }
 
+/* tipc_bclink_xmit2 - broadcast buffer chain to all nodes in cluster
+ *                     and to identified node local sockets
+ * @buf: chain of buffers containing message
+ * Consumes the buffer chain, except when returning -ELINKCONG
+ * Returns 0 if success, otherwise errno: -ELINKCONG,-EHOSTUNREACH,-EMSGSIZE
+ */
+int tipc_bclink_xmit2(struct sk_buff *buf)
+{
+       int rc = 0;
+       int bc = 0;
+       struct sk_buff *clbuf;
+
+       /* Prepare clone of message for local node */
+       clbuf = tipc_msg_reassemble(buf);
+       if (unlikely(!clbuf)) {
+               kfree_skb_list(buf);
+               return -EHOSTUNREACH;
+       }
+
+       /* Broadcast to all other nodes */
+       if (likely(bclink)) {
+               tipc_bclink_lock();
+               if (likely(bclink->bcast_nodes.count)) {
+                       rc = __tipc_link_xmit(bcl, buf);
+                       if (likely(!rc)) {
+                               bclink_set_last_sent();
+                               bcl->stats.queue_sz_counts++;
+                               bcl->stats.accu_queue_sz += bcl->out_queue_size;
+                       }
+                       bc = 1;
+               }
+               tipc_bclink_unlock();
+       }
+
+       if (unlikely(!bc))
+               kfree_skb_list(buf);
+
+       /* Deliver message clone */
+       if (likely(!rc))
+               tipc_sk_mcast_rcv(clbuf);
+       else
+               kfree_skb(clbuf);
+
+       return rc;
+}
+
 /**
  * bclink_accept_pkt - accept an incoming, in-sequence broadcast packet
  *
index 00330c45df3e04d03626a31d5f2ee6ba66569298..d90645f408aa9e2a65517a8686e46cb847c27259 100644 (file)
@@ -1,7 +1,7 @@
 /*
  * net/tipc/bcast.h: Include file for TIPC broadcast code
  *
- * Copyright (c) 2003-2006, Ericsson AB
+ * Copyright (c) 2003-2006, 2014, Ericsson AB
  * Copyright (c) 2005, 2010-2011, Wind River Systems
  * All rights reserved.
  *
@@ -98,5 +98,7 @@ int  tipc_bclink_stats(char *stats_buf, const u32 buf_size);
 int  tipc_bclink_reset_stats(void);
 int  tipc_bclink_set_queue_limits(u32 limit);
 void tipc_bcbearer_sort(struct tipc_node_map *nm_ptr, u32 node, bool action);
+uint  tipc_bclink_get_mtu(void);
+int tipc_bclink_xmit2(struct sk_buff *buf);
 
 #endif
index ce6d929d66d26c88c4d6ee5fc9a37a916ca719e1..9682296f5e7cea595ed981fdcae15f378708c50a 100644 (file)
@@ -417,3 +417,38 @@ int tipc_msg_eval(struct sk_buff *buf, u32 *dnode)
        msg_set_destport(msg, dport);
        return TIPC_OK;
 }
+
+/* tipc_msg_reassemble() - clone a buffer chain of fragments and
+ *                         reassemble the clones into one message
+ */
+struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain)
+{
+       struct sk_buff *buf = chain;
+       struct sk_buff *frag = buf;
+       struct sk_buff *head = NULL;
+       int hdr_sz;
+
+       /* Copy header if single buffer */
+       if (!buf->next) {
+               hdr_sz = skb_headroom(buf) + msg_hdr_sz(buf_msg(buf));
+               return __pskb_copy(buf, hdr_sz, GFP_ATOMIC);
+       }
+
+       /* Clone all fragments and reassemble */
+       while (buf) {
+               frag = skb_clone(buf, GFP_ATOMIC);
+               if (!frag)
+                       goto error;
+               frag->next = NULL;
+               if (tipc_buf_append(&head, &frag))
+                       break;
+               if (!head)
+                       goto error;
+               buf = buf->next;
+       }
+       return frag;
+error:
+       pr_warn("Failed do clone local mcast rcv buffer\n");
+       kfree_skb(head);
+       return NULL;
+}
index 7d574346e75e2348b7e1192a9628b5d9b807f1ec..a15d59601bf9d32b7dee9cdb134fc871305e6603 100644 (file)
@@ -744,4 +744,6 @@ bool tipc_msg_make_bundle(struct sk_buff **buf, u32 mtu, u32 dnode);
 int tipc_msg_build2(struct tipc_msg *mhdr, struct iovec const *iov,
                    int offset, int dsz, int mtu , struct sk_buff **chain);
 
+struct sk_buff *tipc_msg_reassemble(struct sk_buff *chain);
+
 #endif
index de01622672b2bdf8cbc53d0c4e4ba82118a7da04..8d30995682b1904da628c8d8d5597a6cf65f01f3 100644 (file)
@@ -534,6 +534,46 @@ static unsigned int tipc_poll(struct file *file, struct socket *sock,
        return mask;
 }
 
+/* tipc_sk_mcast_rcv - Deliver multicast message to all destination sockets
+ */
+void tipc_sk_mcast_rcv(struct sk_buff *buf)
+{
+       struct tipc_msg *msg = buf_msg(buf);
+       struct tipc_port_list dports = {0, NULL, };
+       struct tipc_port_list *item;
+       struct sk_buff *b;
+       uint i, last, dst = 0;
+       u32 scope = TIPC_CLUSTER_SCOPE;
+
+       if (in_own_node(msg_orignode(msg)))
+               scope = TIPC_NODE_SCOPE;
+
+       /* Create destination port list: */
+       tipc_nametbl_mc_translate(msg_nametype(msg),
+                                 msg_namelower(msg),
+                                 msg_nameupper(msg),
+                                 scope,
+                                 &dports);
+       last = dports.count;
+       if (!last) {
+               kfree_skb(buf);
+               return;
+       }
+
+       for (item = &dports; item; item = item->next) {
+               for (i = 0; i < PLSIZE && ++dst <= last; i++) {
+                       b = (dst != last) ? skb_clone(buf, GFP_ATOMIC) : buf;
+                       if (!b) {
+                               pr_warn("Failed do clone mcast rcv buffer\n");
+                               continue;
+                       }
+                       msg_set_destport(msg, item->ports[i]);
+                       tipc_sk_rcv(b);
+               }
+       }
+       tipc_port_list_free(&dports);
+}
+
 /**
  * tipc_sk_proto_rcv - receive a connection mng protocol message
  * @tsk: receiving socket
index 2cdede9eda1ba48bf9b4da79ec3b03a8c0b859c3..43b75b3cecedb9b3bee4fd6a9a42a8f5a4cbcf0b 100644 (file)
@@ -85,4 +85,6 @@ static inline int tipc_sk_conn_cong(struct tipc_sock *tsk)
 
 int tipc_sk_rcv(struct sk_buff *buf);
 
+void tipc_sk_mcast_rcv(struct sk_buff *buf);
+
 #endif