rxrpc: Implement a mechanism to send an event notification to a connection
authorDavid Howells <dhowells@redhat.com>
Thu, 20 Oct 2022 08:08:34 +0000 (09:08 +0100)
committerDavid Howells <dhowells@redhat.com>
Fri, 6 Jan 2023 09:43:31 +0000 (09:43 +0000)
Provide a means by which an event notification can be sent to a connection
through such that the I/O thread can pick it up and handle it rather than
doing it in a separate workqueue.

This is then used to move the deferred final ACK of a call into the I/O
thread rather than a separate work queue as part of the drive to do all
transmission from the I/O thread.

Signed-off-by: David Howells <dhowells@redhat.com>
cc: Marc Dionne <marc.dionne@auristor.com>
cc: linux-afs@lists.infradead.org

include/trace/events/rxrpc.h
net/rxrpc/ar-internal.h
net/rxrpc/conn_event.c
net/rxrpc/conn_object.c
net/rxrpc/io_thread.c
net/rxrpc/local_object.c

index eac513668e3338385857c47cd3acff03f3d19ea5..b969756f97fce43ea1507a8d81b06a4427f0a2c4 100644 (file)
        EM(rxrpc_conn_get_call_input,           "GET inp-call") \
        EM(rxrpc_conn_get_conn_input,           "GET inp-conn") \
        EM(rxrpc_conn_get_idle,                 "GET idle    ") \
-       EM(rxrpc_conn_get_poke,                 "GET poke    ") \
+       EM(rxrpc_conn_get_poke_timer,           "GET poke    ") \
        EM(rxrpc_conn_get_service_conn,         "GET svc-conn") \
        EM(rxrpc_conn_new_client,               "NEW client  ") \
        EM(rxrpc_conn_new_service,              "NEW service ") \
        EM(rxrpc_conn_put_service_reaped,       "PUT svc-reap") \
        EM(rxrpc_conn_put_unbundle,             "PUT unbundle") \
        EM(rxrpc_conn_put_unidle,               "PUT unidle  ") \
+       EM(rxrpc_conn_put_work,                 "PUT work    ") \
        EM(rxrpc_conn_queue_challenge,          "QUE chall   ") \
-       EM(rxrpc_conn_queue_retry_work,         "QUE retry-wk") \
        EM(rxrpc_conn_queue_rx_work,            "QUE rx-work ") \
-       EM(rxrpc_conn_queue_timer,              "QUE timer   ") \
        EM(rxrpc_conn_see_new_service_conn,     "SEE new-svc ") \
        EM(rxrpc_conn_see_reap_service,         "SEE reap-svc") \
        E_(rxrpc_conn_see_work,                 "SEE work    ")
index 0cf28a56aec5d6063ff753b6ac6834d7b7419351..d82d7f36cdaa0e7cdfe36fe8ccd079e79335f6f5 100644 (file)
@@ -202,6 +202,7 @@ struct rxrpc_host_header {
  * - max 48 bytes (struct sk_buff::cb)
  */
 struct rxrpc_skb_priv {
+       struct rxrpc_connection *conn;  /* Connection referred to (poke packet) */
        u16             offset;         /* Offset of data */
        u16             len;            /* Length of data */
        u8              flags;
@@ -292,6 +293,7 @@ struct rxrpc_local {
        struct rxrpc_sock __rcu *service;       /* Service(s) listening on this endpoint */
        struct rw_semaphore     defrag_sem;     /* control re-enablement of IP DF bit */
        struct sk_buff_head     rx_queue;       /* Received packets */
+       struct list_head        conn_attend_q;  /* Conns requiring immediate attention */
        struct list_head        call_attend_q;  /* Calls requiring immediate attention */
        struct rb_root          client_bundles; /* Client connection bundles by socket params */
        spinlock_t              client_bundles_lock; /* Lock for client_bundles */
@@ -441,6 +443,7 @@ struct rxrpc_connection {
        struct rxrpc_peer       *peer;          /* Remote endpoint */
        struct rxrpc_net        *rxnet;         /* Network namespace to which call belongs */
        struct key              *key;           /* Security details */
+       struct list_head        attend_link;    /* Link in local->conn_attend_q */
 
        refcount_t              ref;
        atomic_t                active;         /* Active count for service conns */
@@ -905,6 +908,7 @@ void rxrpc_conn_retransmit_call(struct rxrpc_connection *conn, struct sk_buff *s
 void rxrpc_process_connection(struct work_struct *);
 void rxrpc_process_delayed_final_acks(struct rxrpc_connection *, bool);
 int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb);
+void rxrpc_input_conn_event(struct rxrpc_connection *conn, struct sk_buff *skb);
 
 /*
  * conn_object.c
@@ -912,6 +916,7 @@ int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb);
 extern unsigned int rxrpc_connection_expiry;
 extern unsigned int rxrpc_closed_conn_expiry;
 
+void rxrpc_poke_conn(struct rxrpc_connection *conn, enum rxrpc_conn_trace why);
 struct rxrpc_connection *rxrpc_alloc_connection(struct rxrpc_net *, gfp_t);
 struct rxrpc_connection *rxrpc_find_client_connection_rcu(struct rxrpc_local *,
                                                          struct sockaddr_rxrpc *,
index dfd29882126f62055dd50f37a75562f87b14f7c8..7a980a32344f1c8d5055430b8f3b32ea8ddf670e 100644 (file)
@@ -412,10 +412,6 @@ static void rxrpc_do_process_connection(struct rxrpc_connection *conn)
        if (test_and_clear_bit(RXRPC_CONN_EV_CHALLENGE, &conn->events))
                rxrpc_secure_connection(conn);
 
-       /* Process delayed ACKs whose time has come. */
-       if (conn->flags & RXRPC_CONN_FINAL_ACK_MASK)
-               rxrpc_process_delayed_final_acks(conn, false);
-
        /* go through the conn-level event packets, releasing the ref on this
         * connection that each one has when we've finished with it */
        while ((skb = skb_dequeue(&conn->rx_queue))) {
@@ -515,3 +511,13 @@ int rxrpc_input_conn_packet(struct rxrpc_connection *conn, struct sk_buff *skb)
                return -EPROTO;
        }
 }
+
+/*
+ * Input a connection event.
+ */
+void rxrpc_input_conn_event(struct rxrpc_connection *conn, struct sk_buff *skb)
+{
+       /* Process delayed ACKs whose time has come. */
+       if (conn->flags & RXRPC_CONN_FINAL_ACK_MASK)
+               rxrpc_process_delayed_final_acks(conn, false);
+}
index 2bd3f62888956ce3c8f4986bc8a1593ea01ffab5..281f59e356f5df837144b20e6aa264c9ab6a55ec 100644 (file)
@@ -23,12 +23,30 @@ static void rxrpc_clean_up_connection(struct work_struct *work);
 static void rxrpc_set_service_reap_timer(struct rxrpc_net *rxnet,
                                         unsigned long reap_at);
 
+void rxrpc_poke_conn(struct rxrpc_connection *conn, enum rxrpc_conn_trace why)
+{
+       struct rxrpc_local *local = conn->local;
+       bool busy;
+
+       if (WARN_ON_ONCE(!local))
+               return;
+
+       spin_lock_bh(&local->lock);
+       busy = !list_empty(&conn->attend_link);
+       if (!busy) {
+               rxrpc_get_connection(conn, why);
+               list_add_tail(&conn->attend_link, &local->conn_attend_q);
+       }
+       spin_unlock_bh(&local->lock);
+       rxrpc_wake_up_io_thread(local);
+}
+
 static void rxrpc_connection_timer(struct timer_list *timer)
 {
        struct rxrpc_connection *conn =
                container_of(timer, struct rxrpc_connection, timer);
 
-       rxrpc_queue_conn(conn, rxrpc_conn_queue_timer);
+       rxrpc_poke_conn(conn, rxrpc_conn_get_poke_timer);
 }
 
 /*
index 0e1a548d35f8ee7040b377d2c7966ae2df3d4e04..46e58cf5bc96edbd811b5cda00015204f194bc62 100644 (file)
@@ -421,6 +421,7 @@ reject_packet:
  */
 int rxrpc_io_thread(void *data)
 {
+       struct rxrpc_connection *conn;
        struct sk_buff_head rx_queue;
        struct rxrpc_local *local = data;
        struct rxrpc_call *call;
@@ -436,6 +437,20 @@ int rxrpc_io_thread(void *data)
        for (;;) {
                rxrpc_inc_stat(local->rxnet, stat_io_loop);
 
+               /* Deal with connections that want immediate attention. */
+               conn = list_first_entry_or_null(&local->conn_attend_q,
+                                               struct rxrpc_connection,
+                                               attend_link);
+               if (conn) {
+                       spin_lock_bh(&local->lock);
+                       list_del_init(&conn->attend_link);
+                       spin_unlock_bh(&local->lock);
+
+                       rxrpc_input_conn_event(conn, NULL);
+                       rxrpc_put_connection(conn, rxrpc_conn_put_poke);
+                       continue;
+               }
+
                /* Deal with calls that want immediate attention. */
                if ((call = list_first_entry_or_null(&local->call_attend_q,
                                                     struct rxrpc_call,
@@ -463,6 +478,7 @@ int rxrpc_io_thread(void *data)
                                rxrpc_input_error(local, skb);
                                rxrpc_free_skb(skb, rxrpc_skb_put_error_report);
                                break;
+                               break;
                        default:
                                WARN_ON_ONCE(1);
                                rxrpc_free_skb(skb, rxrpc_skb_put_unknown);
@@ -481,7 +497,8 @@ int rxrpc_io_thread(void *data)
                set_current_state(TASK_INTERRUPTIBLE);
                should_stop = kthread_should_stop();
                if (!skb_queue_empty(&local->rx_queue) ||
-                   !list_empty(&local->call_attend_q)) {
+                   !list_empty(&local->call_attend_q) ||
+                   !list_empty(&local->conn_attend_q)) {
                        __set_current_state(TASK_RUNNING);
                        continue;
                }
index c0ac2fe07ec489152cd00f10ba8b40af8ea64bea..8ef6cd8defa45f47628e68f0012f201bbdebf346 100644 (file)
@@ -100,6 +100,7 @@ static struct rxrpc_local *rxrpc_alloc_local(struct net *net,
                init_rwsem(&local->defrag_sem);
                init_completion(&local->io_thread_ready);
                skb_queue_head_init(&local->rx_queue);
+               INIT_LIST_HEAD(&local->conn_attend_q);
                INIT_LIST_HEAD(&local->call_attend_q);
                local->client_bundles = RB_ROOT;
                spin_lock_init(&local->client_bundles_lock);