RDS: Change send lock from a mutex to a spinlock
authorAndy Grover <andy.grover@oracle.com>
Wed, 24 Mar 2010 00:39:07 +0000 (17:39 -0700)
committerAndy Grover <andy.grover@oracle.com>
Thu, 9 Sep 2010 01:12:12 +0000 (18:12 -0700)
This change allows us to call rds_send_xmit() from a tasklet,
which is crucial to our new operating model.

* Change c_send_lock to a spinlock
* Update stats fields "sem_" to "_lock"
* Remove unneeded rds_conn_is_sending()

About locking between shutdown and send -- send checks if the
connection is up. Shutdown puts the connection into
DISCONNECTING. After this, all threads entering send will exit
immediately. However, a thread could be *in* send_xmit(), so
shutdown acquires the c_send_lock to ensure everyone is out
before proceeding with connection shutdown.

Signed-off-by: Andy Grover <andy.grover@oracle.com>
net/rds/connection.c
net/rds/rds.h
net/rds/send.c
net/rds/stats.c

index 88bcaf3f3e169e93543cf59d79eb47d6a3d482db..56aebe444ad35b165706ca84f2af8f90e8a212a0 100644 (file)
@@ -62,18 +62,6 @@ static struct hlist_head *rds_conn_bucket(__be32 laddr, __be32 faddr)
                var |= RDS_INFO_CONNECTION_FLAG_##suffix;       \
 } while (0)
 
-static inline int rds_conn_is_sending(struct rds_connection *conn)
-{
-       int ret = 0;
-
-       if (!mutex_trylock(&conn->c_send_lock))
-               ret = 1;
-       else
-               mutex_unlock(&conn->c_send_lock);
-
-       return ret;
-}
-
 static struct rds_connection *rds_conn_lookup(struct hlist_head *head,
                                              __be32 laddr, __be32 faddr,
                                              struct rds_transport *trans)
@@ -158,7 +146,7 @@ static struct rds_connection *__rds_conn_create(__be32 laddr, __be32 faddr,
        spin_lock_init(&conn->c_lock);
        conn->c_next_tx_seq = 1;
 
-       mutex_init(&conn->c_send_lock);
+       spin_lock_init(&conn->c_send_lock);
        INIT_LIST_HEAD(&conn->c_send_queue);
        INIT_LIST_HEAD(&conn->c_retrans);
 
@@ -283,10 +271,12 @@ void rds_conn_shutdown(struct rds_connection *conn)
                }
                mutex_unlock(&conn->c_cm_lock);
 
-               mutex_lock(&conn->c_send_lock);
+               /* verify everybody's out of rds_send_xmit() */
+               spin_lock_irq(&conn->c_send_lock);
+               spin_unlock_irq(&conn->c_send_lock);
+
                conn->c_trans->conn_shutdown(conn);
                rds_conn_reset(conn);
-               mutex_unlock(&conn->c_send_lock);
 
                if (!rds_conn_transition(conn, RDS_CONN_DISCONNECTING, RDS_CONN_DOWN)) {
                        /* This can happen - eg when we're in the middle of tearing
@@ -476,7 +466,7 @@ static int rds_conn_info_visitor(struct rds_connection *conn,
        cinfo->flags = 0;
 
        rds_conn_info_set(cinfo->flags,
-                         rds_conn_is_sending(conn), SENDING);
+                         spin_is_locked(&conn->c_send_lock), SENDING);
        /* XXX Future: return the state rather than these funky bits */
        rds_conn_info_set(cinfo->flags,
                          atomic_read(&conn->c_state) == RDS_CONN_CONNECTING,
index e81d7e47847477c1216f9c815be8daa9e8101e28..c3a668b9cc1467c40cf177636434cac6dbeaea06 100644 (file)
@@ -91,7 +91,7 @@ struct rds_connection {
        struct rds_cong_map     *c_lcong;
        struct rds_cong_map     *c_fcong;
 
-       struct mutex            c_send_lock;    /* protect send ring */
+       spinlock_t              c_send_lock;    /* protect send ring */
        struct rds_message      *c_xmit_rm;
        unsigned long           c_xmit_sg;
        unsigned int            c_xmit_hdr_off;
@@ -548,8 +548,8 @@ struct rds_statistics {
        uint64_t        s_recv_ping;
        uint64_t        s_send_queue_empty;
        uint64_t        s_send_queue_full;
-       uint64_t        s_send_sem_contention;
-       uint64_t        s_send_sem_queue_raced;
+       uint64_t        s_send_lock_contention;
+       uint64_t        s_send_lock_queue_raced;
        uint64_t        s_send_immediate_retry;
        uint64_t        s_send_delayed_retry;
        uint64_t        s_send_drop_acked;
index 8a0647af5d95a4557644a190c767fce60a3c6a2b..d4feec6ad09c475caf63217f9bcd5d948a114616 100644 (file)
@@ -116,19 +116,18 @@ int rds_send_xmit(struct rds_connection *conn)
        int was_empty = 0;
        LIST_HEAD(to_be_dropped);
 
+       if (!rds_conn_up(conn))
+               goto out;
+
        /*
         * sendmsg calls here after having queued its message on the send
         * queue.  We only have one task feeding the connection at a time.  If
         * another thread is already feeding the queue then we back off.  This
         * avoids blocking the caller and trading per-connection data between
         * caches per message.
-        *
-        * The sem holder will issue a retry if they notice that someone queued
-        * a message after they stopped walking the send queue but before they
-        * dropped the sem.
         */
-       if (!mutex_trylock(&conn->c_send_lock)) {
-               rds_stats_inc(s_send_sem_contention);
+       if (!spin_trylock_irqsave(&conn->c_send_lock, flags)) {
+               rds_stats_inc(s_send_lock_contention);
                ret = -ENOMEM;
                goto out;
        }
@@ -346,7 +345,7 @@ int rds_send_xmit(struct rds_connection *conn)
         * stop processing the loop when the transport hasn't taken
         * responsibility for forward progress.
         */
-       mutex_unlock(&conn->c_send_lock);
+       spin_unlock_irqrestore(&conn->c_send_lock, flags);
 
        if (send_quota == 0 && !was_empty) {
                /* We exhausted the send quota, but there's work left to
@@ -360,7 +359,7 @@ int rds_send_xmit(struct rds_connection *conn)
                 * spin lock */
                spin_lock_irqsave(&conn->c_lock, flags);
                if (!list_empty(&conn->c_send_queue)) {
-                       rds_stats_inc(s_send_sem_queue_raced);
+                       rds_stats_inc(s_send_lock_queue_raced);
                        ret = -EAGAIN;
                }
                spin_unlock_irqrestore(&conn->c_lock, flags);
index c66d95d9c262682de2a381a66688a02e9439adb8..b77be8be33ba88aec6f56c1ccda4bfe713c3c7e1 100644 (file)
@@ -57,8 +57,8 @@ static const char *const rds_stat_names[] = {
        "recv_ping",
        "send_queue_empty",
        "send_queue_full",
-       "send_sem_contention",
-       "send_sem_queue_raced",
+       "send_lock_contention",
+       "send_lock_queue_raced",
        "send_immediate_retry",
        "send_delayed_retry",
        "send_drop_acked",