rxrpc: Add tracepoint for working out where aborts happen
[linux-block.git] / net / rxrpc / call_event.c
index e60cf65c223237fd1a9bbf8bd6e9d72a16742b32..af88ad7d2cf96d3c6ea92a52afe9dcf5d7e3abaf 100644 (file)
@@ -25,7 +25,7 @@
  * propose an ACK be sent
  */
 void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
-                        u32 serial, bool immediate)
+                        u16 skew, u32 serial, bool immediate)
 {
        unsigned long expiry;
        s8 prior = rxrpc_ack_priority[ack_reason];
@@ -44,8 +44,10 @@ void __rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
        /* update DELAY, IDLE, REQUESTED and PING_RESPONSE ACK serial
         * numbers */
        if (prior == rxrpc_ack_priority[call->ackr_reason]) {
-               if (prior <= 4)
+               if (prior <= 4) {
+                       call->ackr_skew = skew;
                        call->ackr_serial = serial;
+               }
                if (immediate)
                        goto cancel_timer;
                return;
@@ -93,7 +95,7 @@ cancel_timer:
        _debug("cancel timer %%%u", serial);
        try_to_del_timer_sync(&call->ack_timer);
        read_lock_bh(&call->state_lock);
-       if (call->state <= RXRPC_CALL_COMPLETE &&
+       if (call->state < RXRPC_CALL_COMPLETE &&
            !test_and_set_bit(RXRPC_CALL_EV_ACK, &call->events))
                rxrpc_queue_call(call);
        read_unlock_bh(&call->state_lock);
@@ -103,13 +105,13 @@ cancel_timer:
  * propose an ACK be sent, locking the call structure
  */
 void rxrpc_propose_ACK(struct rxrpc_call *call, u8 ack_reason,
-                      u32 serial, bool immediate)
+                      u16 skew, u32 serial, bool immediate)
 {
        s8 prior = rxrpc_ack_priority[ack_reason];
 
        if (prior > rxrpc_ack_priority[call->ackr_reason]) {
                spin_lock_bh(&call->lock);
-               __rxrpc_propose_ACK(call, ack_reason, serial, immediate);
+               __rxrpc_propose_ACK(call, ack_reason, skew, serial, immediate);
                spin_unlock_bh(&call->lock);
        }
 }
@@ -121,7 +123,7 @@ static void rxrpc_set_resend(struct rxrpc_call *call, u8 resend,
                             unsigned long resend_at)
 {
        read_lock_bh(&call->state_lock);
-       if (call->state >= RXRPC_CALL_COMPLETE)
+       if (call->state == RXRPC_CALL_COMPLETE)
                resend = 0;
 
        if (resend & 1) {
@@ -191,6 +193,8 @@ static void rxrpc_resend(struct rxrpc_call *call)
                                stop = true;
                                sp->resend_at = jiffies + 3;
                        } else {
+                               if (rxrpc_is_client_call(call))
+                                       rxrpc_expose_client_call(call);
                                sp->resend_at =
                                        jiffies + rxrpc_resend_timeout;
                        }
@@ -226,7 +230,7 @@ static void rxrpc_resend_timer(struct rxrpc_call *call)
        _enter("%d,%d,%d",
               call->acks_tail, call->acks_unacked, call->acks_head);
 
-       if (call->state >= RXRPC_CALL_COMPLETE)
+       if (call->state == RXRPC_CALL_COMPLETE)
                return;
 
        resend = 0;
@@ -376,7 +380,7 @@ static void rxrpc_rotate_tx_window(struct rxrpc_call *call, u32 hard)
                call->acks_hard++;
        }
 
-       wake_up(&call->tx_waitq);
+       wake_up(&call->waitq);
 }
 
 /*
@@ -407,6 +411,7 @@ static int rxrpc_drain_rx_oos_queue(struct rxrpc_call *call)
 
        skb = skb_dequeue(&call->rx_oos_queue);
        if (skb) {
+               rxrpc_see_skb(skb);
                sp = rxrpc_skb(skb);
 
                _debug("drain OOS packet %d [%d]",
@@ -427,6 +432,7 @@ static int rxrpc_drain_rx_oos_queue(struct rxrpc_call *call)
 
                        /* find out what the next packet is */
                        skb = skb_peek(&call->rx_oos_queue);
+                       rxrpc_see_skb(skb);
                        if (skb)
                                call->rx_first_oos = rxrpc_skb(skb)->hdr.seq;
                        else
@@ -459,8 +465,7 @@ static void rxrpc_insert_oos_packet(struct rxrpc_call *call,
        skb->destructor = rxrpc_packet_destructor;
        ASSERTCMP(sp->call, ==, NULL);
        sp->call = call;
-       rxrpc_get_call(call);
-       atomic_inc(&call->skb_count);
+       rxrpc_get_call_for_skb(call, skb);
 
        /* insert into the buffer in sequence order */
        spin_lock_bh(&call->lock);
@@ -546,7 +551,7 @@ static void rxrpc_extract_ackinfo(struct rxrpc_call *call, struct sk_buff *skb,
 
        mtu = min(ntohl(ackinfo.rxMTU), ntohl(ackinfo.maxMTU));
 
-       peer = call->conn->params.peer;
+       peer = call->peer;
        if (mtu < peer->maxdata) {
                spin_lock_bh(&peer->lock);
                peer->maxdata = mtu;
@@ -576,6 +581,7 @@ process_further:
        if (!skb)
                return -EAGAIN;
 
+       rxrpc_see_skb(skb);
        _net("deferred skb %p", skb);
 
        sp = rxrpc_skb(skb);
@@ -592,7 +598,8 @@ process_further:
 
                /* secured packets must be verified and possibly decrypted */
                if (call->conn->security->verify_packet(call, skb,
-                                                       _abort_code) < 0)
+                                                       sp->hdr.seq,
+                                                       sp->hdr.cksum) < 0)
                        goto protocol_error;
 
                rxrpc_insert_oos_packet(call, skb);
@@ -625,7 +632,7 @@ process_further:
                if (ack.reason == RXRPC_ACK_PING) {
                        _proto("Rx ACK %%%u PING Request", latest);
                        rxrpc_propose_ACK(call, RXRPC_ACK_PING_RESPONSE,
-                                         sp->hdr.serial, true);
+                                         skb->priority, sp->hdr.serial, true);
                }
 
                /* discard any out-of-order or duplicate ACKs */
@@ -704,7 +711,7 @@ all_acked:
                break;
        case RXRPC_CALL_SERVER_AWAIT_ACK:
                _debug("srv complete");
-               call->state = RXRPC_CALL_COMPLETE;
+               __rxrpc_call_completed(call);
                post_ACK = true;
                break;
        case RXRPC_CALL_CLIENT_SEND_REQUEST:
@@ -734,8 +741,7 @@ all_acked:
                _debug("post ACK");
                skb->mark = RXRPC_SKB_MARK_FINAL_ACK;
                sp->call = call;
-               rxrpc_get_call(call);
-               atomic_inc(&call->skb_count);
+               rxrpc_get_call_for_skb(call, skb);
                spin_lock_bh(&call->lock);
                if (rxrpc_queue_rcv_skb(call, skb, true, true) < 0)
                        BUG();
@@ -794,8 +800,7 @@ static int rxrpc_post_message(struct rxrpc_call *call, u32 mark, u32 error,
                memset(sp, 0, sizeof(*sp));
                sp->error = error;
                sp->call = call;
-               rxrpc_get_call(call);
-               atomic_inc(&call->skb_count);
+               rxrpc_get_call_for_skb(call, skb);
 
                spin_lock_bh(&call->lock);
                ret = rxrpc_queue_rcv_skb(call, skb, true, fatal);
@@ -807,8 +812,9 @@ static int rxrpc_post_message(struct rxrpc_call *call, u32 mark, u32 error,
 }
 
 /*
- * handle background processing of incoming call packets and ACK / abort
- * generation
+ * Handle background processing of incoming call packets and ACK / abort
+ * generation.  A ref on the call is donated to us by whoever queued the work
+ * item.
  */
 void rxrpc_process_call(struct work_struct *work)
 {
@@ -823,17 +829,20 @@ void rxrpc_process_call(struct work_struct *work)
        unsigned long bits;
        __be32 data, pad;
        size_t len;
+       bool requeue = false;
        int loop, nbit, ioc, ret, mtu;
        u32 serial, abort_code = RX_PROTOCOL_ERROR;
        u8 *acks = NULL;
 
+       rxrpc_see_call(call);
+
        //printk("\n--------------------\n");
        _enter("{%d,%s,%lx} [%lu]",
               call->debug_id, rxrpc_call_states[call->state], call->events,
               (jiffies - call->creation_jif) / (HZ / 10));
 
-       if (test_and_set_bit(RXRPC_CALL_PROC_BUSY, &call->flags)) {
-               _debug("XXXXXXXXXXXXX RUNNING ON MULTIPLE CPUS XXXXXXXXXXXXX");
+       if (call->state >= RXRPC_CALL_COMPLETE) {
+               rxrpc_put_call(call, rxrpc_call_put);
                return;
        }
 
@@ -842,8 +851,8 @@ void rxrpc_process_call(struct work_struct *work)
 
        /* there's a good chance we're going to have to send a message, so set
         * one up in advance */
-       msg.msg_name    = &call->conn->params.peer->srx.transport;
-       msg.msg_namelen = call->conn->params.peer->srx.transport_len;
+       msg.msg_name    = &call->peer->srx.transport;
+       msg.msg_namelen = call->peer->srx.transport_len;
        msg.msg_control = NULL;
        msg.msg_controllen = 0;
        msg.msg_flags   = 0;
@@ -867,30 +876,27 @@ skip_msg_init:
        /* deal with events of a final nature */
        if (test_bit(RXRPC_CALL_EV_RCVD_ERROR, &call->events)) {
                enum rxrpc_skb_mark mark;
-               int error;
 
                clear_bit(RXRPC_CALL_EV_CONN_ABORT, &call->events);
                clear_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events);
                clear_bit(RXRPC_CALL_EV_ABORT, &call->events);
 
-               error = call->error_report;
-               if (error < RXRPC_LOCAL_ERROR_OFFSET) {
+               if (call->completion == RXRPC_CALL_NETWORK_ERROR) {
                        mark = RXRPC_SKB_MARK_NET_ERROR;
-                       _debug("post net error %d", error);
+                       _debug("post net error %d", call->error);
                } else {
                        mark = RXRPC_SKB_MARK_LOCAL_ERROR;
-                       error -= RXRPC_LOCAL_ERROR_OFFSET;
-                       _debug("post net local error %d", error);
+                       _debug("post net local error %d", call->error);
                }
 
-               if (rxrpc_post_message(call, mark, error, true) < 0)
+               if (rxrpc_post_message(call, mark, call->error, true) < 0)
                        goto no_mem;
                clear_bit(RXRPC_CALL_EV_RCVD_ERROR, &call->events);
                goto kill_ACKs;
        }
 
        if (test_bit(RXRPC_CALL_EV_CONN_ABORT, &call->events)) {
-               ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
+               ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
 
                clear_bit(RXRPC_CALL_EV_REJECT_BUSY, &call->events);
                clear_bit(RXRPC_CALL_EV_ABORT, &call->events);
@@ -898,7 +904,7 @@ skip_msg_init:
                _debug("post conn abort");
 
                if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
-                                      call->conn->error, true) < 0)
+                                      call->error, true) < 0)
                        goto no_mem;
                clear_bit(RXRPC_CALL_EV_CONN_ABORT, &call->events);
                goto kill_ACKs;
@@ -911,13 +917,13 @@ skip_msg_init:
        }
 
        if (test_bit(RXRPC_CALL_EV_ABORT, &call->events)) {
-               ASSERTCMP(call->state, >, RXRPC_CALL_COMPLETE);
+               ASSERTCMP(call->state, ==, RXRPC_CALL_COMPLETE);
 
                if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
-                                      ECONNABORTED, true) < 0)
+                                      call->error, true) < 0)
                        goto no_mem;
                whdr.type = RXRPC_PACKET_TYPE_ABORT;
-               data = htonl(call->local_abort);
+               data = htonl(call->abort_code);
                iov[1].iov_base = &data;
                iov[1].iov_len = sizeof(data);
                genbit = RXRPC_CALL_EV_ABORT;
@@ -977,13 +983,7 @@ skip_msg_init:
        }
 
        if (test_bit(RXRPC_CALL_EV_LIFE_TIMER, &call->events)) {
-               write_lock_bh(&call->state_lock);
-               if (call->state <= RXRPC_CALL_COMPLETE) {
-                       call->state = RXRPC_CALL_LOCALLY_ABORTED;
-                       call->local_abort = RX_CALL_TIMEOUT;
-                       set_bit(RXRPC_CALL_EV_ABORT, &call->events);
-               }
-               write_unlock_bh(&call->state_lock);
+               rxrpc_abort_call("EXP", call, 0, RX_CALL_TIMEOUT, ETIME);
 
                _debug("post timeout");
                if (rxrpc_post_message(call, RXRPC_SKB_MARK_LOCAL_ERROR,
@@ -996,7 +996,8 @@ skip_msg_init:
 
        /* deal with assorted inbound messages */
        if (!skb_queue_empty(&call->rx_queue)) {
-               switch (rxrpc_process_rx_queue(call, &abort_code)) {
+               ret = rxrpc_process_rx_queue(call, &abort_code);
+               switch (ret) {
                case 0:
                case -EAGAIN:
                        break;
@@ -1005,7 +1006,7 @@ skip_msg_init:
                case -EKEYEXPIRED:
                case -EKEYREJECTED:
                case -EPROTO:
-                       rxrpc_abort_call(call, abort_code);
+                       rxrpc_abort_call("PRO", call, 0, abort_code, -ret);
                        goto kill_ACKs;
                }
        }
@@ -1095,16 +1096,21 @@ skip_msg_init:
                spin_lock_bh(&call->lock);
 
                if (call->state == RXRPC_CALL_SERVER_SECURING) {
+                       struct rxrpc_sock *rx;
                        _debug("securing");
-                       write_lock(&call->socket->call_lock);
-                       if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
-                           !test_bit(RXRPC_CALL_EV_RELEASE, &call->events)) {
-                               _debug("not released");
-                               call->state = RXRPC_CALL_SERVER_ACCEPTING;
-                               list_move_tail(&call->accept_link,
-                                              &call->socket->acceptq);
+                       rcu_read_lock();
+                       rx = rcu_dereference(call->socket);
+                       if (rx) {
+                               write_lock(&rx->call_lock);
+                               if (!test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
+                                       _debug("not released");
+                                       call->state = RXRPC_CALL_SERVER_ACCEPTING;
+                                       list_move_tail(&call->accept_link,
+                                                      &rx->acceptq);
+                               }
+                               write_unlock(&rx->call_lock);
                        }
-                       write_unlock(&call->socket->call_lock);
+                       rcu_read_unlock();
                        read_lock(&call->state_lock);
                        if (call->state < RXRPC_CALL_COMPLETE)
                                set_bit(RXRPC_CALL_EV_POST_ACCEPT, &call->events);
@@ -1146,20 +1152,14 @@ skip_msg_init:
                goto maybe_reschedule;
        }
 
-       if (test_bit(RXRPC_CALL_EV_RELEASE, &call->events)) {
-               rxrpc_release_call(call);
-               clear_bit(RXRPC_CALL_EV_RELEASE, &call->events);
-       }
-
        /* other events may have been raised since we started checking */
        goto maybe_reschedule;
 
 send_ACK_with_skew:
-       ack.maxSkew = htons(atomic_read(&call->conn->hi_serial) -
-                           ntohl(ack.serial));
+       ack.maxSkew = htons(call->ackr_skew);
 send_ACK:
-       mtu = call->conn->params.peer->if_mtu;
-       mtu -= call->conn->params.peer->hdrsize;
+       mtu = call->peer->if_mtu;
+       mtu -= call->peer->hdrsize;
        ackinfo.maxMTU  = htonl(mtu);
        ackinfo.rwind   = htonl(rxrpc_rx_window_size);
 
@@ -1217,10 +1217,8 @@ send_message_2:
                             &msg, iov, ioc, len);
        if (ret < 0) {
                _debug("sendmsg failed: %d", ret);
-               read_lock_bh(&call->state_lock);
-               if (call->state < RXRPC_CALL_DEAD)
-                       rxrpc_queue_call(call);
-               read_unlock_bh(&call->state_lock);
+               if (call->state < RXRPC_CALL_COMPLETE)
+                       requeue = true;
                goto error;
        }
 
@@ -1231,10 +1229,7 @@ send_message_2:
                goto kill_ACKs;
 
        case RXRPC_CALL_EV_ACK_FINAL:
-               write_lock_bh(&call->state_lock);
-               if (call->state == RXRPC_CALL_CLIENT_FINAL_ACK)
-                       call->state = RXRPC_CALL_COMPLETE;
-               write_unlock_bh(&call->state_lock);
+               rxrpc_call_completed(call);
                goto kill_ACKs;
 
        default:
@@ -1246,7 +1241,8 @@ send_message_2:
                case RXRPC_CALL_SERVER_ACK_REQUEST:
                        _debug("start ACK timer");
                        rxrpc_propose_ACK(call, RXRPC_ACK_DELAY,
-                                         call->ackr_serial, false);
+                                         call->ackr_skew, call->ackr_serial,
+                                         false);
                default:
                        break;
                }
@@ -1255,42 +1251,22 @@ send_message_2:
 
 kill_ACKs:
        del_timer_sync(&call->ack_timer);
-       if (test_and_clear_bit(RXRPC_CALL_EV_ACK_FINAL, &call->events))
-               rxrpc_put_call(call);
        clear_bit(RXRPC_CALL_EV_ACK, &call->events);
 
 maybe_reschedule:
        if (call->events || !skb_queue_empty(&call->rx_queue)) {
-               read_lock_bh(&call->state_lock);
-               if (call->state < RXRPC_CALL_DEAD)
-                       rxrpc_queue_call(call);
-               read_unlock_bh(&call->state_lock);
-       }
-
-       /* don't leave aborted connections on the accept queue */
-       if (call->state >= RXRPC_CALL_COMPLETE &&
-           !list_empty(&call->accept_link)) {
-               _debug("X unlinking once-pending call %p { e=%lx f=%lx c=%x }",
-                      call, call->events, call->flags, call->conn->proto.cid);
-
-               read_lock_bh(&call->state_lock);
-               if (!test_bit(RXRPC_CALL_RELEASED, &call->flags) &&
-                   !test_and_set_bit(RXRPC_CALL_EV_RELEASE, &call->events))
-                       rxrpc_queue_call(call);
-               read_unlock_bh(&call->state_lock);
+               if (call->state < RXRPC_CALL_COMPLETE)
+                       requeue = true;
        }
 
 error:
-       clear_bit(RXRPC_CALL_PROC_BUSY, &call->flags);
        kfree(acks);
 
-       /* because we don't want two CPUs both processing the work item for one
-        * call at the same time, we use a flag to note when it's busy; however
-        * this means there's a race between clearing the flag and setting the
-        * work pending bit and the work item being processed again */
-       if (call->events && !work_pending(&call->processor)) {
+       if ((requeue || call->events) && !work_pending(&call->processor)) {
                _debug("jumpstart %x", call->conn->proto.cid);
-               rxrpc_queue_call(call);
+               __rxrpc_queue_call(call);
+       } else {
+               rxrpc_put_call(call, rxrpc_call_put);
        }
 
        _leave("");