dlm: merge toss and keep hash table lists into one list
authorAlexander Aring <aahringo@redhat.com>
Mon, 15 Apr 2024 18:39:37 +0000 (14:39 -0400)
committerDavid Teigland <teigland@redhat.com>
Tue, 16 Apr 2024 18:49:13 +0000 (13:49 -0500)
There are several places where lock processing can perform two hash table
lookups, first in the "keep" list, and if not found, in the "toss" list.
This patch introduces a new rsb state flag "RSB_TOSS" to represent the
difference between the state of being on keep vs toss list, so that the
two lists can be combined.  This avoids cases of two lookups.

Signed-off-by: Alexander Aring <aahringo@redhat.com>
Signed-off-by: David Teigland <teigland@redhat.com>
fs/dlm/debug_fs.c
fs/dlm/dir.c
fs/dlm/dlm_internal.h
fs/dlm/lock.c
fs/dlm/lockspace.c
fs/dlm/recover.c
fs/dlm/recoverd.c

index b8234eba5e340a20668a45d5daa139e3916b48c4..37f4dfca5e44120b20e17536c6295447132ce955 100644 (file)
@@ -450,12 +450,20 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
        if (seq->op == &format4_seq_ops)
                ri->format = 4;
 
-       tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
+       tree = &ls->ls_rsbtbl[bucket].r;
 
        spin_lock_bh(&ls->ls_rsbtbl_lock);
        if (!RB_EMPTY_ROOT(tree)) {
                for (node = rb_first(tree); node; node = rb_next(node)) {
                        r = rb_entry(node, struct dlm_rsb, res_hashnode);
+                       if (toss) {
+                               if (!rsb_flag(r, RSB_TOSS))
+                                       continue;
+                       } else {
+                               if (rsb_flag(r, RSB_TOSS))
+                                       continue;
+                       }
+
                        if (!entry--) {
                                dlm_hold_rsb(r);
                                ri->rsb = r;
@@ -482,12 +490,20 @@ static void *table_seq_start(struct seq_file *seq, loff_t *pos)
                        kfree(ri);
                        return NULL;
                }
-               tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
+               tree = &ls->ls_rsbtbl[bucket].r;
 
                spin_lock_bh(&ls->ls_rsbtbl_lock);
                if (!RB_EMPTY_ROOT(tree)) {
                        node = rb_first(tree);
                        r = rb_entry(node, struct dlm_rsb, res_hashnode);
+                       if (toss) {
+                               if (!rsb_flag(r, RSB_TOSS))
+                                       continue;
+                       } else {
+                               if (rsb_flag(r, RSB_TOSS))
+                                       continue;
+                       }
+
                        dlm_hold_rsb(r);
                        ri->rsb = r;
                        ri->bucket = bucket;
@@ -548,12 +564,19 @@ static void *table_seq_next(struct seq_file *seq, void *iter_ptr, loff_t *pos)
                        ++*pos;
                        return NULL;
                }
-               tree = toss ? &ls->ls_rsbtbl[bucket].toss : &ls->ls_rsbtbl[bucket].keep;
+               tree = &ls->ls_rsbtbl[bucket].r;
 
                spin_lock_bh(&ls->ls_rsbtbl_lock);
                if (!RB_EMPTY_ROOT(tree)) {
                        next = rb_first(tree);
                        r = rb_entry(next, struct dlm_rsb, res_hashnode);
+                       if (toss) {
+                               if (!rsb_flag(r, RSB_TOSS))
+                                       continue;
+                       } else {
+                               if (rsb_flag(r, RSB_TOSS))
+                                       continue;
+                       }
                        dlm_hold_rsb(r);
                        ri->rsb = r;
                        ri->bucket = bucket;
index 5315f4f46cc7b81a65bb1fb192c5c34874bcecf1..f8039f3ee2d1698f830071cf98c8494f498a2412 100644 (file)
@@ -205,12 +205,8 @@ static struct dlm_rsb *find_rsb_root(struct dlm_ls *ls, const char *name,
        bucket = hash & (ls->ls_rsbtbl_size - 1);
 
        spin_lock_bh(&ls->ls_rsbtbl_lock);
-       rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].keep, name, len, &r);
-       if (rv)
-               rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].toss,
-                                        name, len, &r);
+       rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[bucket].r, name, len, &r);
        spin_unlock_bh(&ls->ls_rsbtbl_lock);
-
        if (!rv)
                return r;
 
index 2c961db53b27b2b3606623b5f360dbe1c15ce39c..af88fc2f978c296d7b3d2ab0c3d42613c29c5818 100644 (file)
@@ -103,8 +103,7 @@ do { \
 #define DLM_RTF_SHRINK_BIT     0
 
 struct dlm_rsbtable {
-       struct rb_root          keep;
-       struct rb_root          toss;
+       struct rb_root          r;
        unsigned long           flags;
 };
 
@@ -376,6 +375,7 @@ enum rsb_flags {
        RSB_RECOVER_CONVERT,
        RSB_RECOVER_GRANT,
        RSB_RECOVER_LVB_INVAL,
+       RSB_TOSS,
 };
 
 static inline void rsb_set_flag(struct dlm_rsb *r, enum rsb_flags flag)
index af57d9d1243495e8b28a1c9f4b07c8312a864539..08ec1a04476a71ab3d98f304e9700140e9efe9d5 100644 (file)
@@ -616,23 +616,22 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
 
        spin_lock_bh(&ls->ls_rsbtbl_lock);
 
-       error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
+       error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
        if (error)
-               goto do_toss;
+               goto do_new;
        
        /*
         * rsb is active, so we can't check master_nodeid without lock_rsb.
         */
 
+       if (rsb_flag(r, RSB_TOSS))
+               goto do_toss;
+
        kref_get(&r->res_ref);
        goto out_unlock;
 
 
  do_toss:
-       error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
-       if (error)
-               goto do_new;
-
        /*
         * rsb found inactive (master_nodeid may be out of date unless
         * we are the dir_nodeid or were the master)  No other thread
@@ -669,8 +668,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
                r->res_first_lkid = 0;
        }
 
-       rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
-       error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+       rsb_clear_flag(r, RSB_TOSS);
        goto out_unlock;
 
 
@@ -731,7 +729,7 @@ static int find_rsb_dir(struct dlm_ls *ls, const void *name, int len,
        }
 
  out_add:
-       error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+       error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
  out_unlock:
        spin_unlock_bh(&ls->ls_rsbtbl_lock);
  out:
@@ -760,8 +758,11 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 
        spin_lock_bh(&ls->ls_rsbtbl_lock);
 
-       error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
+       error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
        if (error)
+               goto do_new;
+
+       if (rsb_flag(r, RSB_TOSS))
                goto do_toss;
 
        /*
@@ -773,10 +774,6 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
 
 
  do_toss:
-       error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
-       if (error)
-               goto do_new;
-
        /*
         * rsb found inactive. No other thread is using this rsb because
         * it's on the toss list, so we can look at or update
@@ -804,8 +801,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
                r->res_nodeid = 0;
        }
 
-       rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
-       error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+       rsb_clear_flag(r, RSB_TOSS);
        goto out_unlock;
 
 
@@ -829,7 +825,7 @@ static int find_rsb_nodir(struct dlm_ls *ls, const void *name, int len,
        r->res_nodeid = (dir_nodeid == our_nodeid) ? 0 : dir_nodeid;
        kref_init(&r->res_ref);
 
-       error = rsb_insert(r, &ls->ls_rsbtbl[b].keep);
+       error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
  out_unlock:
        spin_unlock_bh(&ls->ls_rsbtbl_lock);
  out:
@@ -1049,8 +1045,11 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
                return error;
 
        spin_lock_bh(&ls->ls_rsbtbl_lock);
-       error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
+       error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
        if (!error) {
+               if (rsb_flag(r, RSB_TOSS))
+                       goto do_toss;
+
                /* because the rsb is active, we need to lock_rsb before
                 * checking/changing re_master_nodeid
                 */
@@ -1067,12 +1066,11 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
                put_rsb(r);
 
                return 0;
-       }
-
-       error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
-       if (error)
+       } else {
                goto not_found;
+       }
 
+ do_toss:
        /* because the rsb is inactive (on toss list), it's not refcounted
         * and lock_rsb is not used, but is protected by the rsbtbl lock
         */
@@ -1102,8 +1100,9 @@ int dlm_master_lookup(struct dlm_ls *ls, int from_nodeid, const char *name,
        r->res_nodeid = from_nodeid;
        kref_init(&r->res_ref);
        r->res_toss_time = jiffies;
+       rsb_set_flag(r, RSB_TOSS);
 
-       error = rsb_insert(r, &ls->ls_rsbtbl[b].toss);
+       error = rsb_insert(r, &ls->ls_rsbtbl[b].r);
        if (error) {
                /* should never happen */
                dlm_free_rsb(r);
@@ -1127,8 +1126,11 @@ static void dlm_dump_rsb_hash(struct dlm_ls *ls, uint32_t hash)
 
        spin_lock_bh(&ls->ls_rsbtbl_lock);
        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-               for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
+               for (n = rb_first(&ls->ls_rsbtbl[i].r); n; n = rb_next(n)) {
                        r = rb_entry(n, struct dlm_rsb, res_hashnode);
+                       if (rsb_flag(r, RSB_TOSS))
+                               continue;
+
                        if (r->res_hash == hash)
                                dlm_dump_rsb(r);
                }
@@ -1146,14 +1148,10 @@ void dlm_dump_rsb_name(struct dlm_ls *ls, const char *name, int len)
        b = hash & (ls->ls_rsbtbl_size - 1);
 
        spin_lock_bh(&ls->ls_rsbtbl_lock);
-       error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
+       error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
        if (!error)
-               goto out_dump;
-
-       error = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
-       if (error)
                goto out;
- out_dump:
+
        dlm_dump_rsb(r);
  out:
        spin_unlock_bh(&ls->ls_rsbtbl_lock);
@@ -1166,8 +1164,8 @@ static void toss_rsb(struct kref *kref)
 
        DLM_ASSERT(list_empty(&r->res_root_list), dlm_print_rsb(r););
        kref_init(&r->res_ref);
-       rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[r->res_bucket].keep);
-       rsb_insert(r, &ls->ls_rsbtbl[r->res_bucket].toss);
+       WARN_ON(rsb_flag(r, RSB_TOSS));
+       rsb_set_flag(r, RSB_TOSS);
        r->res_toss_time = jiffies;
        set_bit(DLM_RTF_SHRINK_BIT, &ls->ls_rsbtbl[r->res_bucket].flags);
        if (r->res_lvbptr) {
@@ -1627,9 +1625,11 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
                return;
        }
 
-       for (n = rb_first(&ls->ls_rsbtbl[b].toss); n; n = next) {
+       for (n = rb_first(&ls->ls_rsbtbl[b].r); n; n = next) {
                next = rb_next(n);
                r = rb_entry(n, struct dlm_rsb, res_hashnode);
+               if (!rsb_flag(r, RSB_TOSS))
+                       continue;
 
                /* If we're the directory record for this rsb, and
                   we're not the master of it, then we need to wait
@@ -1672,7 +1672,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
                        continue;
                }
 
-               rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+               rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
                dlm_free_rsb(r);
        }
 
@@ -1696,8 +1696,14 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
                len = ls->ls_remove_lens[i];
 
                spin_lock_bh(&ls->ls_rsbtbl_lock);
-               rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
+               rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
                if (rv) {
+                       spin_unlock_bh(&ls->ls_rsbtbl_lock);
+                       log_error(ls, "remove_name not found %s", name);
+                       continue;
+               }
+
+               if (!rsb_flag(r, RSB_TOSS)) {
                        spin_unlock_bh(&ls->ls_rsbtbl_lock);
                        log_debug(ls, "remove_name not toss %s", name);
                        continue;
@@ -1734,7 +1740,7 @@ static void shrink_bucket(struct dlm_ls *ls, int b)
                        continue;
                }
 
-               rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+               rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
                send_remove(r);
                spin_unlock_bh(&ls->ls_rsbtbl_lock);
 
@@ -4202,17 +4208,16 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
 
        spin_lock_bh(&ls->ls_rsbtbl_lock);
 
-       rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].toss, name, len, &r);
+       rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].r, name, len, &r);
        if (rv) {
-               /* verify the rsb is on keep list per comment above */
-               rv = dlm_search_rsb_tree(&ls->ls_rsbtbl[b].keep, name, len, &r);
-               if (rv) {
-                       /* should not happen */
-                       log_error(ls, "receive_remove from %d not found %s",
-                                 from_nodeid, name);
-                       spin_unlock_bh(&ls->ls_rsbtbl_lock);
-                       return;
-               }
+               /* should not happen */
+               log_error(ls, "%s from %d not found %s", __func__,
+                         from_nodeid, name);
+               spin_unlock_bh(&ls->ls_rsbtbl_lock);
+               return;
+       }
+
+       if (!rsb_flag(r, RSB_TOSS)) {
                if (r->res_master_nodeid != from_nodeid) {
                        /* should not happen */
                        log_error(ls, "receive_remove keep from %d master %d",
@@ -4238,7 +4243,7 @@ static void receive_remove(struct dlm_ls *ls, const struct dlm_message *ms)
        }
 
        if (kref_put(&r->res_ref, kill_rsb)) {
-               rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].toss);
+               rb_erase(&r->res_hashnode, &ls->ls_rsbtbl[b].r);
                spin_unlock_bh(&ls->ls_rsbtbl_lock);
                dlm_free_rsb(r);
        } else {
@@ -5314,8 +5319,10 @@ static struct dlm_rsb *find_grant_rsb(struct dlm_ls *ls, int bucket)
        struct dlm_rsb *r;
 
        spin_lock_bh(&ls->ls_rsbtbl_lock);
-       for (n = rb_first(&ls->ls_rsbtbl[bucket].keep); n; n = rb_next(n)) {
+       for (n = rb_first(&ls->ls_rsbtbl[bucket].r); n; n = rb_next(n)) {
                r = rb_entry(n, struct dlm_rsb, res_hashnode);
+               if (rsb_flag(r, RSB_TOSS))
+                       continue;
 
                if (!rsb_flag(r, RSB_RECOVER_GRANT))
                        continue;
index d33dbcd5f4a1d3cc24b96dad3fc166b7818daf73..b5184ad550fa947802dfd66d737ff3d46d0b09de 100644 (file)
@@ -503,8 +503,7 @@ static int new_lockspace(const char *name, const char *cluster,
        if (!ls->ls_rsbtbl)
                goto out_lsfree;
        for (i = 0; i < size; i++) {
-               ls->ls_rsbtbl[i].keep.rb_node = NULL;
-               ls->ls_rsbtbl[i].toss.rb_node = NULL;
+               ls->ls_rsbtbl[i].r.rb_node = NULL;
        }
 
        for (i = 0; i < DLM_REMOVE_NAMES_MAX; i++) {
@@ -837,15 +836,9 @@ static int release_lockspace(struct dlm_ls *ls, int force)
         */
 
        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-               while ((n = rb_first(&ls->ls_rsbtbl[i].keep))) {
+               while ((n = rb_first(&ls->ls_rsbtbl[i].r))) {
                        rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
-                       rb_erase(n, &ls->ls_rsbtbl[i].keep);
-                       dlm_free_rsb(rsb);
-               }
-
-               while ((n = rb_first(&ls->ls_rsbtbl[i].toss))) {
-                       rsb = rb_entry(n, struct dlm_rsb, res_hashnode);
-                       rb_erase(n, &ls->ls_rsbtbl[i].toss);
+                       rb_erase(n, &ls->ls_rsbtbl[i].r);
                        dlm_free_rsb(rsb);
                }
        }
index 9a4c8e4b244225962b4f4850ca542aef1e928e1e..e53d88e4ec935302e7959d27df9a6a7af0d29ea5 100644 (file)
@@ -888,10 +888,13 @@ void dlm_clear_toss(struct dlm_ls *ls)
 
        spin_lock(&ls->ls_rsbtbl_lock);
        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-               for (n = rb_first(&ls->ls_rsbtbl[i].toss); n; n = next) {
+               for (n = rb_first(&ls->ls_rsbtbl[i].r); n; n = next) {
                        next = rb_next(n);
                        r = rb_entry(n, struct dlm_rsb, res_hashnode);
-                       rb_erase(n, &ls->ls_rsbtbl[i].toss);
+                       if (!rsb_flag(r, RSB_TOSS))
+                               continue;
+
+                       rb_erase(n, &ls->ls_rsbtbl[i].r);
                        dlm_free_rsb(r);
                        count++;
                }
index fa6608363302fcc48d7dbe17d50d6fee6e3a7228..ad696528ebe7b715903265924a1720b297f81051 100644 (file)
@@ -35,9 +35,9 @@ static int dlm_create_masters_list(struct dlm_ls *ls)
 
        spin_lock_bh(&ls->ls_rsbtbl_lock);
        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-               for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
+               for (n = rb_first(&ls->ls_rsbtbl[i].r); n; n = rb_next(n)) {
                        r = rb_entry(n, struct dlm_rsb, res_hashnode);
-                       if (r->res_nodeid)
+                       if (rsb_flag(r, RSB_TOSS) || r->res_nodeid)
                                continue;
 
                        list_add(&r->res_masters_list, &ls->ls_masters_list);
@@ -70,14 +70,14 @@ static void dlm_create_root_list(struct dlm_ls *ls, struct list_head *root_list)
 
        spin_lock_bh(&ls->ls_rsbtbl_lock);
        for (i = 0; i < ls->ls_rsbtbl_size; i++) {
-               for (n = rb_first(&ls->ls_rsbtbl[i].keep); n; n = rb_next(n)) {
+               for (n = rb_first(&ls->ls_rsbtbl[i].r); n; n = rb_next(n)) {
                        r = rb_entry(n, struct dlm_rsb, res_hashnode);
+                       if (WARN_ON_ONCE(rsb_flag(r, RSB_TOSS)))
+                               continue;
+
                        list_add(&r->res_root_list, root_list);
                        dlm_hold_rsb(r);
                }
-
-               if (!RB_EMPTY_ROOT(&ls->ls_rsbtbl[i].toss))
-                       log_error(ls, "%s toss not empty", __func__);
        }
        spin_unlock_bh(&ls->ls_rsbtbl_lock);
 }