tipc: add trace_events for tipc socket
authorTuong Lien <tuong.t.lien@dektech.com.au>
Wed, 19 Dec 2018 02:17:58 +0000 (09:17 +0700)
committerDavid S. Miller <davem@davemloft.net>
Wed, 19 Dec 2018 19:49:24 +0000 (11:49 -0800)
The commit adds the new trace_events for TIPC socket object:

trace_tipc_sk_create()
trace_tipc_sk_poll()
trace_tipc_sk_sendmsg()
trace_tipc_sk_sendmcast()
trace_tipc_sk_sendstream()
trace_tipc_sk_filter_rcv()
trace_tipc_sk_advance_rx()
trace_tipc_sk_rej_msg()
trace_tipc_sk_drop_msg()
trace_tipc_sk_release()
trace_tipc_sk_shutdown()
trace_tipc_sk_overlimit1()
trace_tipc_sk_overlimit2()

Also, enables the traces for the following cases:
- When user creates a TIPC socket;
- When user calls poll() on TIPC socket;
- When user sends a dgram/mcast/stream message.
- When a message is put into the socket 'sk_receive_queue';
- When a message is released from the socket 'sk_receive_queue';
- When a message is rejected (e.g. due to no port, invalid, etc.);
- When a message is dropped (e.g. due to wrong message type);
- When socket is released;
- When socket is shutdown;
- When socket rcvq's allocation is overlimit (> 90%);
- When socket rcvq + bklq's allocation is overlimit (> 90%);
- When the 'TIPC_ERR_OVERLOAD/2' issue happens;

Note:
a) All the socket traces are designed to be able to trace on a specific
socket by either using the 'event filtering' feature on a known socket
'portid' value or the sysctl file:

/proc/sys/net/tipc/sk_filter

The file determines a 'tuple' for what socket should be traced:

(portid, sock type, name type, name lower, name upper)

where:
+ 'portid' is the socket portid generated at socket creating, can be
found in the trace outputs or the 'tipc socket list' command printouts;
+ 'sock type' is the socket type (1 = SOCK_TREAM, ...);
+ 'name type', 'name lower' and 'name upper' are the service name being
connected to or published by the socket.

Value '0' means 'ANY', the default tuple value is (0, 0, 0, 0, 0) i.e.
the traces happen for every sockets with no filter.

b) The 'tipc_sk_overlimit1/2' event is also a conditional trace_event
which happens when the socket receive queue (and backlog queue) is
about to be overloaded, when the queue allocation is > 90%. Then, when
the trace is enabled, the last skbs leading to the TIPC_ERR_OVERLOAD/2
issue can be traced.

The trace event is designed as an 'upper watermark' notification that
the other traces (e.g. 'tipc_sk_advance_rx' vs 'tipc_sk_filter_rcv') or
actions can be triggerred in the meanwhile to see what is going on with
the socket queue.

In addition, the 'trace_tipc_sk_dump()' is also placed at the
'TIPC_ERR_OVERLOAD/2' case, so the socket and last skb can be dumped
for post-analysis.

Acked-by: Ying Xue <ying.xue@windriver.com>
Tested-by: Ying Xue <ying.xue@windriver.com>
Acked-by: Jon Maloy <jon.maloy@ericsson.com>
Signed-off-by: Tuong Lien <tuong.t.lien@dektech.com.au>
Signed-off-by: David S. Miller <davem@davemloft.net>
net/tipc/socket.c
net/tipc/socket.h
net/tipc/sysctl.c
net/tipc/trace.c
net/tipc/trace.h

index b6b2a94eb54ec3b7800879b3935231dd525b23b4..291d6bbe85f48910583b3b6d869aae184bb89b11 100644 (file)
@@ -234,6 +234,7 @@ static u16 tsk_inc(struct tipc_sock *tsk, int msglen)
  */
 static void tsk_advance_rx_queue(struct sock *sk)
 {
+       trace_tipc_sk_advance_rx(sk, NULL, TIPC_DUMP_SK_RCVQ, " ");
        kfree_skb(__skb_dequeue(&sk->sk_receive_queue));
 }
 
@@ -248,6 +249,7 @@ static void tipc_sk_respond(struct sock *sk, struct sk_buff *skb, int err)
        if (!tipc_msg_reverse(onode, &skb, err))
                return;
 
+       trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_NONE, "@sk_respond!");
        dnode = msg_destnode(buf_msg(skb));
        selector = msg_origport(buf_msg(skb));
        tipc_node_xmit_skb(sock_net(sk), skb, dnode, selector);
@@ -483,6 +485,7 @@ static int tipc_sk_create(struct net *net, struct socket *sock,
                        tsk_set_unreliable(tsk, true);
        }
 
+       trace_tipc_sk_create(sk, NULL, TIPC_DUMP_NONE, " ");
        return 0;
 }
 
@@ -572,6 +575,7 @@ static int tipc_release(struct socket *sock)
        tsk = tipc_sk(sk);
        lock_sock(sk);
 
+       trace_tipc_sk_release(sk, NULL, TIPC_DUMP_ALL, " ");
        __tipc_shutdown(sock, TIPC_ERR_NO_PORT);
        sk->sk_shutdown = SHUTDOWN_MASK;
        tipc_sk_leave(tsk);
@@ -719,6 +723,7 @@ static __poll_t tipc_poll(struct file *file, struct socket *sock,
        __poll_t revents = 0;
 
        sock_poll_wait(file, sock, wait);
+       trace_tipc_sk_poll(sk, NULL, TIPC_DUMP_ALL, " ");
 
        if (sk->sk_shutdown & RCV_SHUTDOWN)
                revents |= EPOLLRDHUP | EPOLLIN | EPOLLRDNORM;
@@ -805,9 +810,12 @@ static int tipc_sendmcast(struct  socket *sock, struct tipc_name_seq *seq,
        rc = tipc_msg_build(hdr, msg, 0, dlen, mtu, &pkts);
 
        /* Send message if build was successful */
-       if (unlikely(rc == dlen))
+       if (unlikely(rc == dlen)) {
+               trace_tipc_sk_sendmcast(sk, skb_peek(&pkts),
+                                       TIPC_DUMP_SK_SNDQ, " ");
                rc = tipc_mcast_xmit(net, &pkts, method, &dsts,
                                     &tsk->cong_link_cnt);
+       }
 
        tipc_nlist_purge(&dsts);
 
@@ -1209,8 +1217,10 @@ static void tipc_sk_conn_proto_rcv(struct tipc_sock *tsk, struct sk_buff *skb,
        bool conn_cong;
 
        /* Ignore if connection cannot be validated: */
-       if (!tsk_peer_msg(tsk, hdr))
+       if (!tsk_peer_msg(tsk, hdr)) {
+               trace_tipc_sk_drop_msg(sk, skb, TIPC_DUMP_NONE, "@proto_rcv!");
                goto exit;
+       }
 
        if (unlikely(msg_errcode(hdr))) {
                tipc_set_sk_state(sk, TIPC_DISCONNECTING);
@@ -1378,6 +1388,7 @@ static int __tipc_sendmsg(struct socket *sock, struct msghdr *m, size_t dlen)
        if (unlikely(syn && !tipc_msg_skb_clone(&pkts, &sk->sk_write_queue)))
                return -ENOMEM;
 
+       trace_tipc_sk_sendmsg(sk, skb_peek(&pkts), TIPC_DUMP_SK_SNDQ, " ");
        rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
        if (unlikely(rc == -ELINKCONG)) {
                tipc_dest_push(clinks, dnode, 0);
@@ -1455,6 +1466,8 @@ static int __tipc_sendstream(struct socket *sock, struct msghdr *m, size_t dlen)
                if (unlikely(rc != send))
                        break;
 
+               trace_tipc_sk_sendstream(sk, skb_peek(&pkts),
+                                        TIPC_DUMP_SK_SNDQ, " ");
                rc = tipc_node_xmit(net, &pkts, dnode, tsk->portid);
                if (unlikely(rc == -ELINKCONG)) {
                        tsk->cong_link_cnt = 1;
@@ -2129,6 +2142,7 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
        struct sk_buff_head inputq;
        int limit, err = TIPC_OK;
 
+       trace_tipc_sk_filter_rcv(sk, skb, TIPC_DUMP_ALL, " ");
        TIPC_SKB_CB(skb)->bytes_read = 0;
        __skb_queue_head_init(&inputq);
        __skb_queue_tail(&inputq, skb);
@@ -2148,17 +2162,25 @@ static void tipc_sk_filter_rcv(struct sock *sk, struct sk_buff *skb,
                    (!grp && msg_in_group(hdr)))
                        err = TIPC_ERR_NO_PORT;
                else if (sk_rmem_alloc_get(sk) + skb->truesize >= limit) {
+                       trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL,
+                                          "err_overload2!");
                        atomic_inc(&sk->sk_drops);
                        err = TIPC_ERR_OVERLOAD;
                }
 
                if (unlikely(err)) {
-                       tipc_skb_reject(net, err, skb, xmitq);
+                       if (tipc_msg_reverse(tipc_own_addr(net), &skb, err)) {
+                               trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_NONE,
+                                                     "@filter_rcv!");
+                               __skb_queue_tail(xmitq, skb);
+                       }
                        err = TIPC_OK;
                        continue;
                }
                __skb_queue_tail(&sk->sk_receive_queue, skb);
                skb_set_owner_r(skb, sk);
+               trace_tipc_sk_overlimit2(sk, skb, TIPC_DUMP_ALL,
+                                        "rcvq >90% allocated!");
                sk->sk_data_ready(sk);
        }
 }
@@ -2224,14 +2246,21 @@ static void tipc_sk_enqueue(struct sk_buff_head *inputq, struct sock *sk,
                if (!sk->sk_backlog.len)
                        atomic_set(dcnt, 0);
                lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
-               if (likely(!sk_add_backlog(sk, skb, lim)))
+               if (likely(!sk_add_backlog(sk, skb, lim))) {
+                       trace_tipc_sk_overlimit1(sk, skb, TIPC_DUMP_ALL,
+                                                "bklg & rcvq >90% allocated!");
                        continue;
+               }
 
+               trace_tipc_sk_dump(sk, skb, TIPC_DUMP_ALL, "err_overload!");
                /* Overload => reject message back to sender */
                onode = tipc_own_addr(sock_net(sk));
                atomic_inc(&sk->sk_drops);
-               if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD))
+               if (tipc_msg_reverse(onode, &skb, TIPC_ERR_OVERLOAD)) {
+                       trace_tipc_sk_rej_msg(sk, skb, TIPC_DUMP_ALL,
+                                             "@sk_enqueue!");
                        __skb_queue_tail(xmitq, skb);
+               }
                break;
        }
 }
@@ -2280,6 +2309,8 @@ void tipc_sk_rcv(struct net *net, struct sk_buff_head *inputq)
                /* Prepare for message rejection */
                if (!tipc_msg_reverse(tipc_own_addr(net), &skb, err))
                        continue;
+
+               trace_tipc_sk_rej_msg(NULL, skb, TIPC_DUMP_NONE, "@sk_rcv!");
 xmit:
                dnode = msg_destnode(buf_msg(skb));
                tipc_node_xmit_skb(net, skb, dnode, dport);
@@ -2553,6 +2584,7 @@ static int tipc_shutdown(struct socket *sock, int how)
 
        lock_sock(sk);
 
+       trace_tipc_sk_shutdown(sk, NULL, TIPC_DUMP_ALL, " ");
        __tipc_shutdown(sock, TIPC_CONN_SHUTDOWN);
        sk->sk_shutdown = SEND_SHUTDOWN;
 
@@ -3566,11 +3598,106 @@ int tipc_nl_publ_dump(struct sk_buff *skb, struct netlink_callback *cb)
        return skb->len;
 }
 
+/**
+ * tipc_sk_filtering - check if a socket should be traced
+ * @sk: the socket to be examined
+ * @sysctl_tipc_sk_filter[]: the socket tuple for filtering,
+ *  (portid, sock type, name type, name lower, name upper)
+ *
+ * Returns true if the socket meets the socket tuple data
+ * (value 0 = 'any') or when there is no tuple set (all = 0),
+ * otherwise false
+ */
+bool tipc_sk_filtering(struct sock *sk)
+{
+       struct tipc_sock *tsk;
+       struct publication *p;
+       u32 _port, _sktype, _type, _lower, _upper;
+       u32 type = 0, lower = 0, upper = 0;
+
+       if (!sk)
+               return true;
+
+       tsk = tipc_sk(sk);
+
+       _port = sysctl_tipc_sk_filter[0];
+       _sktype = sysctl_tipc_sk_filter[1];
+       _type = sysctl_tipc_sk_filter[2];
+       _lower = sysctl_tipc_sk_filter[3];
+       _upper = sysctl_tipc_sk_filter[4];
+
+       if (!_port && !_sktype && !_type && !_lower && !_upper)
+               return true;
+
+       if (_port)
+               return (_port == tsk->portid);
+
+       if (_sktype && _sktype != sk->sk_type)
+               return false;
+
+       if (tsk->published) {
+               p = list_first_entry_or_null(&tsk->publications,
+                                            struct publication, binding_sock);
+               if (p) {
+                       type = p->type;
+                       lower = p->lower;
+                       upper = p->upper;
+               }
+       }
+
+       if (!tipc_sk_type_connectionless(sk)) {
+               type = tsk->conn_type;
+               lower = tsk->conn_instance;
+               upper = tsk->conn_instance;
+       }
+
+       if ((_type && _type != type) || (_lower && _lower != lower) ||
+           (_upper && _upper != upper))
+               return false;
+
+       return true;
+}
+
 u32 tipc_sock_get_portid(struct sock *sk)
 {
        return (sk) ? (tipc_sk(sk))->portid : 0;
 }
 
+/**
+ * tipc_sk_overlimit1 - check if socket rx queue is about to be overloaded,
+ *                     both the rcv and backlog queues are considered
+ * @sk: tipc sk to be checked
+ * @skb: tipc msg to be checked
+ *
+ * Returns true if the socket rx queue allocation is > 90%, otherwise false
+ */
+
+bool tipc_sk_overlimit1(struct sock *sk, struct sk_buff *skb)
+{
+       atomic_t *dcnt = &tipc_sk(sk)->dupl_rcvcnt;
+       unsigned int lim = rcvbuf_limit(sk, skb) + atomic_read(dcnt);
+       unsigned int qsize = sk->sk_backlog.len + sk_rmem_alloc_get(sk);
+
+       return (qsize > lim * 90 / 100);
+}
+
+/**
+ * tipc_sk_overlimit2 - check if socket rx queue is about to be overloaded,
+ *                     only the rcv queue is considered
+ * @sk: tipc sk to be checked
+ * @skb: tipc msg to be checked
+ *
+ * Returns true if the socket rx queue allocation is > 90%, otherwise false
+ */
+
+bool tipc_sk_overlimit2(struct sock *sk, struct sk_buff *skb)
+{
+       unsigned int lim = rcvbuf_limit(sk, skb);
+       unsigned int qsize = sk_rmem_alloc_get(sk);
+
+       return (qsize > lim * 90 / 100);
+}
+
 /**
  * tipc_sk_dump - dump TIPC socket
  * @sk: tipc sk to be dumped
index 07e36545b69646d8896654cf16832d612465a1ed..235b9679acee4415455be63c775f6f6587331b1a 100644 (file)
@@ -72,5 +72,7 @@ int tipc_dump_start(struct netlink_callback *cb);
 int __tipc_dump_start(struct netlink_callback *cb, struct net *net);
 int tipc_dump_done(struct netlink_callback *cb);
 u32 tipc_sock_get_portid(struct sock *sk);
+bool tipc_sk_overlimit1(struct sock *sk, struct sk_buff *skb);
+bool tipc_sk_overlimit2(struct sock *sk, struct sk_buff *skb);
 
 #endif
index 1a779b1e85100b501c1f0cde29cbfaa0d98fb4eb..3481e4906bd6a4a3e1f27ec5d49106090c7ec7f1 100644 (file)
@@ -34,6 +34,7 @@
  */
 
 #include "core.h"
+#include "trace.h"
 
 #include <linux/sysctl.h>
 
@@ -54,6 +55,13 @@ static struct ctl_table tipc_table[] = {
                .mode           = 0644,
                .proc_handler   = proc_dointvec,
        },
+       {
+               .procname       = "sk_filter",
+               .data           = &sysctl_tipc_sk_filter,
+               .maxlen         = sizeof(sysctl_tipc_sk_filter),
+               .mode           = 0644,
+               .proc_handler   = proc_doulongvec_minmax,
+       },
        {}
 };
 
index 846196f0e810cd59cbe57a088413e1f45aef7ad2..964823841efe955187d595e87768928b1f7ee658 100644 (file)
 #define CREATE_TRACE_POINTS
 #include "trace.h"
 
+/**
+ * socket tuples for filtering in socket traces:
+ * (portid, sock type, name type, name lower, name upper)
+ */
+unsigned long sysctl_tipc_sk_filter[5] __read_mostly = {0, };
+
 /**
  * tipc_skb_dump - dump TIPC skb data
  * @skb: skb to be dumped
index 535c8958651f8da55ed80d13eacd80982a3c5012..ebbfcd14627e38e667e107abd040b9b29a2f520e 100644 (file)
@@ -113,11 +113,14 @@ enum {
                        {(0xcbe),       "SYNCH_BEGIN_EVT"               },\
                        {(0xcee),       "SYNCH_END_EVT"                 })
 
+extern unsigned long sysctl_tipc_sk_filter[5] __read_mostly;
+
 int tipc_skb_dump(struct sk_buff *skb, bool more, char *buf);
 int tipc_list_dump(struct sk_buff_head *list, bool more, char *buf);
 int tipc_sk_dump(struct sock *sk, u16 dqueues, char *buf);
 int tipc_link_dump(struct tipc_link *l, u16 dqueues, char *buf);
 int tipc_node_dump(struct tipc_node *n, bool more, char *buf);
+bool tipc_sk_filtering(struct sock *sk);
 
 DECLARE_EVENT_CLASS(tipc_skb_class,
 
@@ -199,12 +202,33 @@ DECLARE_EVENT_CLASS(tipc_sk_class,
                  __get_str(skb_buf), __get_str(buf))
 );
 
-#define DEFINE_SK_EVENT(name) \
-DEFINE_EVENT(tipc_sk_class, name, \
+#define DEFINE_SK_EVENT_FILTER(name) \
+DEFINE_EVENT_CONDITION(tipc_sk_class, name, \
+       TP_PROTO(struct sock *sk, struct sk_buff *skb, u16 dqueues, \
+                const char *header), \
+       TP_ARGS(sk, skb, dqueues, header), \
+       TP_CONDITION(tipc_sk_filtering(sk)))
+DEFINE_SK_EVENT_FILTER(tipc_sk_dump);
+DEFINE_SK_EVENT_FILTER(tipc_sk_create);
+DEFINE_SK_EVENT_FILTER(tipc_sk_sendmcast);
+DEFINE_SK_EVENT_FILTER(tipc_sk_sendmsg);
+DEFINE_SK_EVENT_FILTER(tipc_sk_sendstream);
+DEFINE_SK_EVENT_FILTER(tipc_sk_poll);
+DEFINE_SK_EVENT_FILTER(tipc_sk_filter_rcv);
+DEFINE_SK_EVENT_FILTER(tipc_sk_advance_rx);
+DEFINE_SK_EVENT_FILTER(tipc_sk_rej_msg);
+DEFINE_SK_EVENT_FILTER(tipc_sk_drop_msg);
+DEFINE_SK_EVENT_FILTER(tipc_sk_release);
+DEFINE_SK_EVENT_FILTER(tipc_sk_shutdown);
+
+#define DEFINE_SK_EVENT_FILTER_COND(name, cond) \
+DEFINE_EVENT_CONDITION(tipc_sk_class, name, \
        TP_PROTO(struct sock *sk, struct sk_buff *skb, u16 dqueues, \
                 const char *header), \
-       TP_ARGS(sk, skb, dqueues, header))
-DEFINE_SK_EVENT(tipc_sk_dump);
+       TP_ARGS(sk, skb, dqueues, header), \
+       TP_CONDITION(tipc_sk_filtering(sk) && (cond)))
+DEFINE_SK_EVENT_FILTER_COND(tipc_sk_overlimit1, tipc_sk_overlimit1(sk, skb));
+DEFINE_SK_EVENT_FILTER_COND(tipc_sk_overlimit2, tipc_sk_overlimit2(sk, skb));
 
 DECLARE_EVENT_CLASS(tipc_link_class,