gfs2: Clean up glock work enqueuing
authorAndreas Gruenbacher <agruenba@redhat.com>
Fri, 30 Jun 2017 13:10:01 +0000 (08:10 -0500)
committerBob Peterson <rpeterso@redhat.com>
Wed, 5 Jul 2017 12:21:00 +0000 (07:21 -0500)
This patch adds a standardized queueing mechanism for glock work
with spin_lock protection to prevent races.

Signed-off-by: Andreas Gruenbacher <agruenba@redhat.com>
Signed-off-by: Bob Peterson <rpeterso@redhat.com>
fs/gfs2/glock.c

index 959a19ced4d5f83fccf7e82c88a5855f3d1d26cc..6cd71c50b8bdc7f21a90e4eb29c1ff5162b455ff 100644 (file)
@@ -152,20 +152,34 @@ static void gfs2_glock_remove_from_lru(struct gfs2_glock *gl)
        spin_unlock(&lru_lock);
 }
 
-/**
- * gfs2_glock_put() - Decrement reference count on glock
- * @gl: The glock to put
- *
+/*
+ * Enqueue the glock on the work queue.  Passes one glock reference on to the
+ * work queue.
  */
+static void __gfs2_glock_queue_work(struct gfs2_glock *gl, unsigned long delay) {
+       if (!queue_delayed_work(glock_workqueue, &gl->gl_work, delay)) {
+               /*
+                * We are holding the lockref spinlock, and the work was still
+                * queued above.  The queued work (glock_work_func) takes that
+                * spinlock before dropping its glock reference(s), so it
+                * cannot have dropped them in the meantime.
+                */
+               GLOCK_BUG_ON(gl, gl->gl_lockref.count < 2);
+               gl->gl_lockref.count--;
+       }
+}
 
-void gfs2_glock_put(struct gfs2_glock *gl)
+static void gfs2_glock_queue_work(struct gfs2_glock *gl, unsigned long delay) {
+       spin_lock(&gl->gl_lockref.lock);
+       __gfs2_glock_queue_work(gl, delay);
+       spin_unlock(&gl->gl_lockref.lock);
+}
+
+static void __gfs2_glock_put(struct gfs2_glock *gl)
 {
        struct gfs2_sbd *sdp = gl->gl_name.ln_sbd;
        struct address_space *mapping = gfs2_glock2aspace(gl);
 
-       if (lockref_put_or_lock(&gl->gl_lockref))
-               return;
-
        lockref_mark_dead(&gl->gl_lockref);
 
        gfs2_glock_remove_from_lru(gl);
@@ -177,6 +191,20 @@ void gfs2_glock_put(struct gfs2_glock *gl)
        sdp->sd_lockstruct.ls_ops->lm_put_lock(gl);
 }
 
+/**
+ * gfs2_glock_put() - Decrement reference count on glock
+ * @gl: The glock to put
+ *
+ */
+
+void gfs2_glock_put(struct gfs2_glock *gl)
+{
+       if (lockref_put_or_lock(&gl->gl_lockref))
+               return;
+
+       __gfs2_glock_put(gl);
+}
+
 /**
  * may_grant - check if its ok to grant a new lock
  * @gl: The glock
@@ -482,8 +510,7 @@ __acquires(&gl->gl_lockref.lock)
                    target == LM_ST_UNLOCKED &&
                    test_bit(SDF_SKIP_DLM_UNLOCK, &sdp->sd_flags)) {
                        finish_xmote(gl, target);
-                       if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
-                               gfs2_glock_put(gl);
+                       gfs2_glock_queue_work(gl, 0);
                }
                else if (ret) {
                        pr_err("lm_lock ret %d\n", ret);
@@ -492,8 +519,7 @@ __acquires(&gl->gl_lockref.lock)
                }
        } else { /* lock_nolock */
                finish_xmote(gl, target);
-               if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
-                       gfs2_glock_put(gl);
+               gfs2_glock_queue_work(gl, 0);
        }
 
        spin_lock(&gl->gl_lockref.lock);
@@ -565,8 +591,7 @@ out_sched:
        clear_bit(GLF_LOCK, &gl->gl_flags);
        smp_mb__after_atomic();
        gl->gl_lockref.count++;
-       if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
-               gl->gl_lockref.count--;
+       __gfs2_glock_queue_work(gl, 0);
        return;
 
 out_unlock:
@@ -601,11 +626,11 @@ static void glock_work_func(struct work_struct *work)
 {
        unsigned long delay = 0;
        struct gfs2_glock *gl = container_of(work, struct gfs2_glock, gl_work.work);
-       int drop_ref = 0;
+       unsigned int drop_refs = 1;
 
        if (test_and_clear_bit(GLF_REPLY_PENDING, &gl->gl_flags)) {
                finish_xmote(gl, gl->gl_reply);
-               drop_ref = 1;
+               drop_refs++;
        }
        spin_lock(&gl->gl_lockref.lock);
        if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) &&
@@ -623,17 +648,25 @@ static void glock_work_func(struct work_struct *work)
                }
        }
        run_queue(gl, 0);
-       spin_unlock(&gl->gl_lockref.lock);
-       if (!delay)
-               gfs2_glock_put(gl);
-       else {
+       if (delay) {
+               /* Keep one glock reference for the work we requeue. */
+               drop_refs--;
                if (gl->gl_name.ln_type != LM_TYPE_INODE)
                        delay = 0;
-               if (queue_delayed_work(glock_workqueue, &gl->gl_work, delay) == 0)
-                       gfs2_glock_put(gl);
+               __gfs2_glock_queue_work(gl, delay);
        }
-       if (drop_ref)
-               gfs2_glock_put(gl);
+
+       /*
+        * Drop the remaining glock references manually here. (Mind that
+        * __gfs2_glock_queue_work depends on the lockref spinlock begin held
+        * here as well.)
+        */
+       gl->gl_lockref.count -= drop_refs;
+       if (!gl->gl_lockref.count) {
+               __gfs2_glock_put(gl);
+               return;
+       }
+       spin_unlock(&gl->gl_lockref.lock);
 }
 
 /**
@@ -986,8 +1019,7 @@ int gfs2_glock_nq(struct gfs2_holder *gh)
                     test_and_clear_bit(GLF_FROZEN, &gl->gl_flags))) {
                set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
                gl->gl_lockref.count++;
-               if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
-                       gl->gl_lockref.count--;
+               __gfs2_glock_queue_work(gl, 0);
        }
        run_queue(gl, 1);
        spin_unlock(&gl->gl_lockref.lock);
@@ -1047,17 +1079,15 @@ void gfs2_glock_dq(struct gfs2_holder *gh)
                gfs2_glock_add_to_lru(gl);
 
        trace_gfs2_glock_queue(gh, 0);
+       if (unlikely(!fast_path)) {
+               gl->gl_lockref.count++;
+               if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) &&
+                   !test_bit(GLF_DEMOTE, &gl->gl_flags) &&
+                   gl->gl_name.ln_type == LM_TYPE_INODE)
+                       delay = gl->gl_hold_time;
+               __gfs2_glock_queue_work(gl, delay);
+       }
        spin_unlock(&gl->gl_lockref.lock);
-       if (likely(fast_path))
-               return;
-
-       gfs2_glock_hold(gl);
-       if (test_bit(GLF_PENDING_DEMOTE, &gl->gl_flags) &&
-           !test_bit(GLF_DEMOTE, &gl->gl_flags) &&
-           gl->gl_name.ln_type == LM_TYPE_INODE)
-               delay = gl->gl_hold_time;
-       if (queue_delayed_work(glock_workqueue, &gl->gl_work, delay) == 0)
-               gfs2_glock_put(gl);
 }
 
 void gfs2_glock_dq_wait(struct gfs2_holder *gh)
@@ -1233,9 +1263,8 @@ void gfs2_glock_cb(struct gfs2_glock *gl, unsigned int state)
 
        spin_lock(&gl->gl_lockref.lock);
        handle_callback(gl, state, delay, true);
+       __gfs2_glock_queue_work(gl, delay);
        spin_unlock(&gl->gl_lockref.lock);
-       if (queue_delayed_work(glock_workqueue, &gl->gl_work, delay) == 0)
-               gfs2_glock_put(gl);
 }
 
 /**
@@ -1294,10 +1323,8 @@ void gfs2_glock_complete(struct gfs2_glock *gl, int ret)
 
        gl->gl_lockref.count++;
        set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
+       __gfs2_glock_queue_work(gl, 0);
        spin_unlock(&gl->gl_lockref.lock);
-
-       if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
-               gfs2_glock_put(gl);
 }
 
 static int glock_cmp(void *priv, struct list_head *a, struct list_head *b)
@@ -1355,8 +1382,7 @@ add_back_to_lru:
                if (demote_ok(gl))
                        handle_callback(gl, LM_ST_UNLOCKED, 0, false);
                WARN_ON(!test_and_clear_bit(GLF_LOCK, &gl->gl_flags));
-               if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
-                       gl->gl_lockref.count--;
+               __gfs2_glock_queue_work(gl, 0);
                spin_unlock(&gl->gl_lockref.lock);
                cond_resched_lock(&lru_lock);
        }
@@ -1462,13 +1488,12 @@ static void glock_hash_walk(glock_examiner examiner, const struct gfs2_sbd *sdp)
 
 static void thaw_glock(struct gfs2_glock *gl)
 {
-       if (!test_and_clear_bit(GLF_FROZEN, &gl->gl_flags))
-               goto out;
-       set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
-       if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0) {
-out:
+       if (!test_and_clear_bit(GLF_FROZEN, &gl->gl_flags)) {
                gfs2_glock_put(gl);
+               return;
        }
+       set_bit(GLF_REPLY_PENDING, &gl->gl_flags);
+       gfs2_glock_queue_work(gl, 0);
 }
 
 /**
@@ -1484,9 +1509,8 @@ static void clear_glock(struct gfs2_glock *gl)
        spin_lock(&gl->gl_lockref.lock);
        if (gl->gl_state != LM_ST_UNLOCKED)
                handle_callback(gl, LM_ST_UNLOCKED, 0, false);
+       __gfs2_glock_queue_work(gl, 0);
        spin_unlock(&gl->gl_lockref.lock);
-       if (queue_delayed_work(glock_workqueue, &gl->gl_work, 0) == 0)
-               gfs2_glock_put(gl);
 }
 
 /**