rxrpc: Use irq-disabling spinlocks between app and I/O thread
authorDavid Howells <dhowells@redhat.com>
Wed, 4 Dec 2024 07:47:01 +0000 (07:47 +0000)
committerJakub Kicinski <kuba@kernel.org>
Mon, 9 Dec 2024 21:48:31 +0000 (13:48 -0800)
Where a spinlock is used by both the application thread and the I/O thread,
use irq-disabling locking so that an interrupt taken on the app thread
doesn't also slow down the I/O thread.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org
Signed-off-by: Jakub Kicinski <kuba@kernel.org>
14 files changed:
net/rxrpc/af_rxrpc.c
net/rxrpc/ar-internal.h
net/rxrpc/call_accept.c
net/rxrpc/call_object.c
net/rxrpc/conn_client.c
net/rxrpc/conn_event.c
net/rxrpc/conn_object.c
net/rxrpc/input.c
net/rxrpc/io_thread.c
net/rxrpc/peer_event.c
net/rxrpc/peer_object.c
net/rxrpc/recvmsg.c
net/rxrpc/security.c
net/rxrpc/sendmsg.c

index 9d8bd0b37e41da9f99e2661ae4a29569f5eab650..86873399f7d5076788daf589c9ce5943ea25983f 100644 (file)
@@ -408,9 +408,9 @@ void rxrpc_kernel_shutdown_call(struct socket *sock, struct rxrpc_call *call)
 
                /* Make sure we're not going to call back into a kernel service */
                if (call->notify_rx) {
-                       spin_lock(&call->notify_lock);
+                       spin_lock_irq(&call->notify_lock);
                        call->notify_rx = rxrpc_dummy_notify_rx;
-                       spin_unlock(&call->notify_lock);
+                       spin_unlock_irq(&call->notify_lock);
                }
        }
        mutex_unlock(&call->user_mutex);
index db93d7f78902f3d86b5866bd89716b945e22ebb6..ffd80dc88f40f594ed93144a8526b05210a35318 100644 (file)
@@ -700,7 +700,6 @@ struct rxrpc_call {
        struct rxrpc_txqueue    *send_queue;    /* Queue that sendmsg is writing into */
 
        /* Transmitted data tracking. */
-       spinlock_t              tx_lock;        /* Transmit queue lock */
        struct rxrpc_txqueue    *tx_queue;      /* Start of transmission buffers */
        struct rxrpc_txqueue    *tx_qtail;      /* End of transmission buffers */
        rxrpc_seq_t             tx_qbase;       /* First slot in tx_queue */
index a6776b1604bab68812b46be3c15a81f0c032adc9..e685034ce4f7cecb3595b57257969ea7940dee92 100644 (file)
@@ -188,8 +188,8 @@ void rxrpc_discard_prealloc(struct rxrpc_sock *rx)
        /* Make sure that there aren't any incoming calls in progress before we
         * clear the preallocation buffers.
         */
-       spin_lock(&rx->incoming_lock);
-       spin_unlock(&rx->incoming_lock);
+       spin_lock_irq(&rx->incoming_lock);
+       spin_unlock_irq(&rx->incoming_lock);
 
        head = b->peer_backlog_head;
        tail = b->peer_backlog_tail;
@@ -343,7 +343,7 @@ bool rxrpc_new_incoming_call(struct rxrpc_local *local,
        if (sp->hdr.type != RXRPC_PACKET_TYPE_DATA)
                return rxrpc_protocol_error(skb, rxrpc_eproto_no_service_call);
 
-       read_lock(&local->services_lock);
+       read_lock_irq(&local->services_lock);
 
        /* Weed out packets to services we're not offering.  Packets that would
         * begin a call are explicitly rejected and the rest are just
@@ -399,12 +399,12 @@ bool rxrpc_new_incoming_call(struct rxrpc_local *local,
        spin_unlock(&conn->state_lock);
 
        spin_unlock(&rx->incoming_lock);
-       read_unlock(&local->services_lock);
+       read_unlock_irq(&local->services_lock);
 
        if (hlist_unhashed(&call->error_link)) {
-               spin_lock(&call->peer->lock);
+               spin_lock_irq(&call->peer->lock);
                hlist_add_head(&call->error_link, &call->peer->error_targets);
-               spin_unlock(&call->peer->lock);
+               spin_unlock_irq(&call->peer->lock);
        }
 
        _leave(" = %p{%d}", call, call->debug_id);
@@ -413,20 +413,20 @@ bool rxrpc_new_incoming_call(struct rxrpc_local *local,
        return true;
 
 unsupported_service:
-       read_unlock(&local->services_lock);
+       read_unlock_irq(&local->services_lock);
        return rxrpc_direct_abort(skb, rxrpc_abort_service_not_offered,
                                  RX_INVALID_OPERATION, -EOPNOTSUPP);
 unsupported_security:
-       read_unlock(&local->services_lock);
+       read_unlock_irq(&local->services_lock);
        return rxrpc_direct_abort(skb, rxrpc_abort_service_not_offered,
                                  RX_INVALID_OPERATION, -EKEYREJECTED);
 no_call:
        spin_unlock(&rx->incoming_lock);
-       read_unlock(&local->services_lock);
+       read_unlock_irq(&local->services_lock);
        _leave(" = f [%u]", skb->mark);
        return false;
 discard:
-       read_unlock(&local->services_lock);
+       read_unlock_irq(&local->services_lock);
        return true;
 }
 
index e0644e9a8d218fe5b94f6cf908e1727a0fdbdccc..75cd0b06e14cb6e304fa5d4b27ce31b1b39465a3 100644 (file)
@@ -49,7 +49,7 @@ void rxrpc_poke_call(struct rxrpc_call *call, enum rxrpc_call_poke_trace what)
        bool busy;
 
        if (!test_bit(RXRPC_CALL_DISCONNECTED, &call->flags)) {
-               spin_lock_bh(&local->lock);
+               spin_lock_irq(&local->lock);
                busy = !list_empty(&call->attend_link);
                trace_rxrpc_poke_call(call, busy, what);
                if (!busy && !rxrpc_try_get_call(call, rxrpc_call_get_poke))
@@ -57,7 +57,7 @@ void rxrpc_poke_call(struct rxrpc_call *call, enum rxrpc_call_poke_trace what)
                if (!busy) {
                        list_add_tail(&call->attend_link, &local->call_attend_q);
                }
-               spin_unlock_bh(&local->lock);
+               spin_unlock_irq(&local->lock);
                if (!busy)
                        rxrpc_wake_up_io_thread(local);
        }
@@ -151,7 +151,6 @@ struct rxrpc_call *rxrpc_alloc_call(struct rxrpc_sock *rx, gfp_t gfp,
        skb_queue_head_init(&call->rx_oos_queue);
        init_waitqueue_head(&call->waitq);
        spin_lock_init(&call->notify_lock);
-       spin_lock_init(&call->tx_lock);
        refcount_set(&call->ref, 1);
        call->debug_id          = debug_id;
        call->tx_total_len      = -1;
@@ -302,9 +301,9 @@ static int rxrpc_connect_call(struct rxrpc_call *call, gfp_t gfp)
 
        trace_rxrpc_client(NULL, -1, rxrpc_client_queue_new_call);
        rxrpc_get_call(call, rxrpc_call_get_io_thread);
-       spin_lock(&local->client_call_lock);
+       spin_lock_irq(&local->client_call_lock);
        list_add_tail(&call->wait_link, &local->new_client_calls);
-       spin_unlock(&local->client_call_lock);
+       spin_unlock_irq(&local->client_call_lock);
        rxrpc_wake_up_io_thread(local);
        return 0;
 
@@ -434,7 +433,7 @@ error_attached_to_socket:
 
 /*
  * Set up an incoming call.  call->conn points to the connection.
- * This is called in BH context and isn't allowed to fail.
+ * This is called with interrupts disabled and isn't allowed to fail.
  */
 void rxrpc_incoming_call(struct rxrpc_sock *rx,
                         struct rxrpc_call *call,
@@ -576,7 +575,7 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
        rxrpc_put_call_slot(call);
 
        /* Make sure we don't get any more notifications */
-       spin_lock(&rx->recvmsg_lock);
+       spin_lock_irq(&rx->recvmsg_lock);
 
        if (!list_empty(&call->recvmsg_link)) {
                _debug("unlinking once-pending call %p { e=%lx f=%lx }",
@@ -589,7 +588,7 @@ void rxrpc_release_call(struct rxrpc_sock *rx, struct rxrpc_call *call)
        call->recvmsg_link.next = NULL;
        call->recvmsg_link.prev = NULL;
 
-       spin_unlock(&rx->recvmsg_lock);
+       spin_unlock_irq(&rx->recvmsg_lock);
        if (put)
                rxrpc_put_call(call, rxrpc_call_put_unnotify);
 
index 5f76bd90567c0f9df61a6a29cbe4aff4657306b7..db00991978906682bd8e11bbaf8a5c77f3dbc840 100644 (file)
@@ -510,9 +510,9 @@ void rxrpc_connect_client_calls(struct rxrpc_local *local)
        struct rxrpc_call *call;
        LIST_HEAD(new_client_calls);
 
-       spin_lock(&local->client_call_lock);
+       spin_lock_irq(&local->client_call_lock);
        list_splice_tail_init(&local->new_client_calls, &new_client_calls);
-       spin_unlock(&local->client_call_lock);
+       spin_unlock_irq(&local->client_call_lock);
 
        while ((call = list_first_entry_or_null(&new_client_calls,
                                                struct rxrpc_call, wait_link))) {
@@ -547,9 +547,9 @@ void rxrpc_expose_client_call(struct rxrpc_call *call)
                        set_bit(RXRPC_CONN_DONT_REUSE, &conn->flags);
                trace_rxrpc_client(conn, channel, rxrpc_client_exposed);
 
-               spin_lock(&call->peer->lock);
+               spin_lock_irq(&call->peer->lock);
                hlist_add_head(&call->error_link, &call->peer->error_targets);
-               spin_unlock(&call->peer->lock);
+               spin_unlock_irq(&call->peer->lock);
        }
 }
 
@@ -590,9 +590,9 @@ void rxrpc_disconnect_client_call(struct rxrpc_bundle *bundle, struct rxrpc_call
                ASSERTCMP(call->call_id, ==, 0);
                ASSERT(!test_bit(RXRPC_CALL_EXPOSED, &call->flags));
                /* May still be on ->new_client_calls. */
-               spin_lock(&local->client_call_lock);
+               spin_lock_irq(&local->client_call_lock);
                list_del_init(&call->wait_link);
-               spin_unlock(&local->client_call_lock);
+               spin_unlock_irq(&local->client_call_lock);
                return;
        }
 
index f6c02cc44d98edff300104a4fc8616ea25c3c3bc..6b29a294ee07d568f21c04aa49dbf99b794e8e54 100644 (file)
@@ -26,7 +26,7 @@ static bool rxrpc_set_conn_aborted(struct rxrpc_connection *conn, struct sk_buff
        bool aborted = false;
 
        if (conn->state != RXRPC_CONN_ABORTED) {
-               spin_lock(&conn->state_lock);
+               spin_lock_irq(&conn->state_lock);
                if (conn->state != RXRPC_CONN_ABORTED) {
                        conn->abort_code = abort_code;
                        conn->error      = err;
@@ -37,7 +37,7 @@ static bool rxrpc_set_conn_aborted(struct rxrpc_connection *conn, struct sk_buff
                        set_bit(RXRPC_CONN_EV_ABORT_CALLS, &conn->events);
                        aborted = true;
                }
-               spin_unlock(&conn->state_lock);
+               spin_unlock_irq(&conn->state_lock);
        }
 
        return aborted;
@@ -261,10 +261,10 @@ static int rxrpc_process_event(struct rxrpc_connection *conn,
                if (ret < 0)
                        return ret;
 
-               spin_lock(&conn->state_lock);
+               spin_lock_irq(&conn->state_lock);
                if (conn->state == RXRPC_CONN_SERVICE_CHALLENGING)
                        conn->state = RXRPC_CONN_SERVICE;
-               spin_unlock(&conn->state_lock);
+               spin_unlock_irq(&conn->state_lock);
 
                if (conn->state == RXRPC_CONN_SERVICE) {
                        /* Offload call state flipping to the I/O thread.  As
index b0627398311b404ad369a9561a42069e91f186d6..7eba4d7d9a38027d067377eaca80eae3944f873a 100644 (file)
@@ -31,13 +31,13 @@ void rxrpc_poke_conn(struct rxrpc_connection *conn, enum rxrpc_conn_trace why)
        if (WARN_ON_ONCE(!local))
                return;
 
-       spin_lock_bh(&local->lock);
+       spin_lock_irq(&local->lock);
        busy = !list_empty(&conn->attend_link);
        if (!busy) {
                rxrpc_get_connection(conn, why);
                list_add_tail(&conn->attend_link, &local->conn_attend_q);
        }
-       spin_unlock_bh(&local->lock);
+       spin_unlock_irq(&local->lock);
        rxrpc_wake_up_io_thread(local);
 }
 
@@ -196,9 +196,9 @@ void rxrpc_disconnect_call(struct rxrpc_call *call)
        call->peer->cong_ssthresh = call->cong_ssthresh;
 
        if (!hlist_unhashed(&call->error_link)) {
-               spin_lock(&call->peer->lock);
+               spin_lock_irq(&call->peer->lock);
                hlist_del_init(&call->error_link);
-               spin_unlock(&call->peer->lock);
+               spin_unlock_irq(&call->peer->lock);
        }
 
        if (rxrpc_is_client_call(call)) {
index a7a249872a541d3a590a3ad143ea2351dc60957c..821e10c030868ecdd6ba1da4a047134209b5cb6c 100644 (file)
@@ -424,7 +424,7 @@ static void rxrpc_input_queue_data(struct rxrpc_call *call, struct sk_buff *skb,
        struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
        bool last = sp->hdr.flags & RXRPC_LAST_PACKET;
 
-       __skb_queue_tail(&call->recvmsg_queue, skb);
+       skb_queue_tail(&call->recvmsg_queue, skb);
        rxrpc_input_update_ack_window(call, window, wtop);
        trace_rxrpc_receive(call, last ? why + 1 : why, sp->hdr.serial, sp->hdr.seq);
        if (last)
@@ -501,7 +501,6 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb,
 
                rxrpc_get_skb(skb, rxrpc_skb_get_to_recvmsg);
 
-               spin_lock(&call->recvmsg_queue.lock);
                rxrpc_input_queue_data(call, skb, window, wtop, rxrpc_receive_queue);
                *_notify = true;
 
@@ -523,8 +522,6 @@ static void rxrpc_input_data_one(struct rxrpc_call *call, struct sk_buff *skb,
                                               rxrpc_receive_queue_oos);
                }
 
-               spin_unlock(&call->recvmsg_queue.lock);
-
                call->ackr_sack_base = sack;
        } else {
                unsigned int slot;
index bc678a299bd87553b5e7bff648ad60d2ed82eb53..fbacf2056f643f5f9fc5f45626fca74141fde29a 100644 (file)
@@ -500,9 +500,9 @@ int rxrpc_io_thread(void *data)
                }
 
                /* Deal with connections that want immediate attention. */
-               spin_lock_bh(&local->lock);
+               spin_lock_irq(&local->lock);
                list_splice_tail_init(&local->conn_attend_q, &conn_attend_q);
-               spin_unlock_bh(&local->lock);
+               spin_unlock_irq(&local->lock);
 
                while ((conn = list_first_entry_or_null(&conn_attend_q,
                                                        struct rxrpc_connection,
@@ -519,9 +519,9 @@ int rxrpc_io_thread(void *data)
                        rxrpc_discard_expired_client_conns(local);
 
                /* Deal with calls that want immediate attention. */
-               spin_lock_bh(&local->lock);
+               spin_lock_irq(&local->lock);
                list_splice_tail_init(&local->call_attend_q, &call_attend_q);
-               spin_unlock_bh(&local->lock);
+               spin_unlock_irq(&local->lock);
 
                while ((call = list_first_entry_or_null(&call_attend_q,
                                                        struct rxrpc_call,
index ff30e0c055079d9fb8b58d1f1bc5e84796d8d657..d82e44a3901b0726a9570c226ed2db486f63d385 100644 (file)
@@ -213,23 +213,23 @@ static void rxrpc_distribute_error(struct rxrpc_peer *peer, struct sk_buff *skb,
        struct rxrpc_call *call;
        HLIST_HEAD(error_targets);
 
-       spin_lock(&peer->lock);
+       spin_lock_irq(&peer->lock);
        hlist_move_list(&peer->error_targets, &error_targets);
 
        while (!hlist_empty(&error_targets)) {
                call = hlist_entry(error_targets.first,
                                   struct rxrpc_call, error_link);
                hlist_del_init(&call->error_link);
-               spin_unlock(&peer->lock);
+               spin_unlock_irq(&peer->lock);
 
                rxrpc_see_call(call, rxrpc_call_see_distribute_error);
                rxrpc_set_call_completion(call, compl, 0, -err);
                rxrpc_input_call_event(call);
 
-               spin_lock(&peer->lock);
+               spin_lock_irq(&peer->lock);
        }
 
-       spin_unlock(&peer->lock);
+       spin_unlock_irq(&peer->lock);
 }
 
 /*
index 80ef6f06d5122c81815a5bd27672fc1d18f1645c..27b34ed4d76aa33f8aede02dfd530d84acc7c3fc 100644 (file)
@@ -320,6 +320,7 @@ static void rxrpc_free_peer(struct rxrpc_peer *peer)
  * Set up a new incoming peer.  There shouldn't be any other matching peers
  * since we've already done a search in the list from the non-reentrant context
  * (the data_ready handler) that is the only place we can add new peers.
+ * Called with interrupts disabled.
  */
 void rxrpc_new_incoming_peer(struct rxrpc_local *local, struct rxrpc_peer *peer)
 {
index a482f88c5fc5b693ab8c95c2a06e34647a59e9cd..32cd5f1d541dbaa40515d322b6febad18faf6193 100644 (file)
@@ -36,16 +36,16 @@ void rxrpc_notify_socket(struct rxrpc_call *call)
        sk = &rx->sk;
        if (rx && sk->sk_state < RXRPC_CLOSE) {
                if (call->notify_rx) {
-                       spin_lock(&call->notify_lock);
+                       spin_lock_irq(&call->notify_lock);
                        call->notify_rx(sk, call, call->user_call_ID);
-                       spin_unlock(&call->notify_lock);
+                       spin_unlock_irq(&call->notify_lock);
                } else {
-                       spin_lock(&rx->recvmsg_lock);
+                       spin_lock_irq(&rx->recvmsg_lock);
                        if (list_empty(&call->recvmsg_link)) {
                                rxrpc_get_call(call, rxrpc_call_get_notify_socket);
                                list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
                        }
-                       spin_unlock(&rx->recvmsg_lock);
+                       spin_unlock_irq(&rx->recvmsg_lock);
 
                        if (!sock_flag(sk, SOCK_DEAD)) {
                                _debug("call %ps", sk->sk_data_ready);
@@ -337,14 +337,14 @@ try_again:
         * We also want to weed out calls that got requeued whilst we were
         * shovelling data out.
         */
-       spin_lock(&rx->recvmsg_lock);
+       spin_lock_irq(&rx->recvmsg_lock);
        l = rx->recvmsg_q.next;
        call = list_entry(l, struct rxrpc_call, recvmsg_link);
 
        if (!rxrpc_call_is_complete(call) &&
            skb_queue_empty(&call->recvmsg_queue)) {
                list_del_init(&call->recvmsg_link);
-               spin_unlock(&rx->recvmsg_lock);
+               spin_unlock_irq(&rx->recvmsg_lock);
                release_sock(&rx->sk);
                trace_rxrpc_recvmsg(call->debug_id, rxrpc_recvmsg_unqueue, 0);
                rxrpc_put_call(call, rxrpc_call_put_recvmsg);
@@ -355,7 +355,7 @@ try_again:
                list_del_init(&call->recvmsg_link);
        else
                rxrpc_get_call(call, rxrpc_call_get_recvmsg);
-       spin_unlock(&rx->recvmsg_lock);
+       spin_unlock_irq(&rx->recvmsg_lock);
 
        call_debug_id = call->debug_id;
        trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_dequeue, 0);
@@ -445,9 +445,9 @@ error_unlock_call:
 
 error_requeue_call:
        if (!(flags & MSG_PEEK)) {
-               spin_lock(&rx->recvmsg_lock);
+               spin_lock_irq(&rx->recvmsg_lock);
                list_add(&call->recvmsg_link, &rx->recvmsg_q);
-               spin_unlock(&rx->recvmsg_lock);
+               spin_unlock_irq(&rx->recvmsg_lock);
                trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_requeue, 0);
        } else {
                rxrpc_put_call(call, rxrpc_call_put_recvmsg);
index cb8dd1d3b1d49ed9dc26bdfa301bc263174cd876..9784adc8f275939c099bae9595edfbe27a47f18e 100644 (file)
@@ -114,10 +114,10 @@ found:
        if (conn->state == RXRPC_CONN_CLIENT_UNSECURED) {
                ret = conn->security->init_connection_security(conn, token);
                if (ret == 0) {
-                       spin_lock(&conn->state_lock);
+                       spin_lock_irq(&conn->state_lock);
                        if (conn->state == RXRPC_CONN_CLIENT_UNSECURED)
                                conn->state = RXRPC_CONN_CLIENT;
-                       spin_unlock(&conn->state_lock);
+                       spin_unlock_irq(&conn->state_lock);
                }
        }
        mutex_unlock(&conn->security_lock);
index 381b25597f4e88fadcffd0c61bcaf719606e0f4e..df501a7c92fa365bafa10a20fdd5adbd3dcb3c04 100644 (file)
@@ -261,7 +261,6 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
                trace_rxrpc_tq(call, sq, seq, rxrpc_tq_queue);
 
        /* Add the packet to the call's output buffer */
-       spin_lock(&call->tx_lock);
        poke = (READ_ONCE(call->tx_bottom) == call->send_top);
        sq->bufs[ix] = txb;
        /* Order send_top after the queue->next pointer and txb content. */
@@ -270,7 +269,6 @@ static void rxrpc_queue_packet(struct rxrpc_sock *rx, struct rxrpc_call *call,
                rxrpc_notify_end_tx(rx, call, notify_end_tx);
                call->send_queue = NULL;
        }
-       spin_unlock(&call->tx_lock);
 
        if (poke)
                rxrpc_poke_call(call, rxrpc_call_poke_start);