bpf, cpumap: Use queue_rcu_work() to remove unnecessary rcu_barrier()
authorHou Tao <houtao1@huawei.com>
Wed, 16 Aug 2023 04:59:57 +0000 (12:59 +0800)
committerAlexei Starovoitov <ast@kernel.org>
Mon, 21 Aug 2023 22:21:16 +0000 (15:21 -0700)
As for now __cpu_map_entry_replace() uses call_rcu() to wait for the
inflight xdp program to exit the RCU read critical section, and then
launch kworker cpu_map_kthread_stop() to call kthread_stop() to flush
all pending xdp frames or skbs.

But it is unnecessary to use rcu_barrier() in cpu_map_kthread_stop() to
wait for the completion of __cpu_map_entry_free(), because rcu_barrier()
will wait for all pending RCU callbacks and cpu_map_kthread_stop() only
needs to wait for the completion of a specific __cpu_map_entry_free().

So use queue_rcu_work() to replace call_rcu(), schedule_work() and
rcu_barrier(). queue_rcu_work() will queue a __cpu_map_entry_free()
kworker after a RCU grace period. Because __cpu_map_entry_free() is
running in a kworker context, so it is OK to do all of these freeing
procedures include kthread_stop() in it.

After the update, there is no need to do reference-counting for
bpf_cpu_map_entry, because bpf_cpu_map_entry is freed directly in
__cpu_map_entry_free(), so just remove it.

Signed-off-by: Hou Tao <houtao1@huawei.com>
Reviewed-by: Toke Høiland-Jørgensen <toke@redhat.com>
Link: https://lore.kernel.org/r/20230816045959.358059-2-houtao@huaweicloud.com
Signed-off-by: Alexei Starovoitov <ast@kernel.org>
kernel/bpf/cpumap.c

index ef28c64f1eb13d98ce69403dbbd28b7a5674d2c1..93fbd3e5079e47f5e3ab80c73ea46d8da713e1b6 100644 (file)
@@ -68,11 +68,8 @@ struct bpf_cpu_map_entry {
        struct bpf_cpumap_val value;
        struct bpf_prog *prog;
 
-       atomic_t refcnt; /* Control when this struct can be free'ed */
-       struct rcu_head rcu;
-
-       struct work_struct kthread_stop_wq;
        struct completion kthread_running;
+       struct rcu_work free_work;
 };
 
 struct bpf_cpu_map {
@@ -117,11 +114,6 @@ static struct bpf_map *cpu_map_alloc(union bpf_attr *attr)
        return &cmap->map;
 }
 
-static void get_cpu_map_entry(struct bpf_cpu_map_entry *rcpu)
-{
-       atomic_inc(&rcpu->refcnt);
-}
-
 static void __cpu_map_ring_cleanup(struct ptr_ring *ring)
 {
        /* The tear-down procedure should have made sure that queue is
@@ -142,35 +134,6 @@ static void __cpu_map_ring_cleanup(struct ptr_ring *ring)
        }
 }
 
-static void put_cpu_map_entry(struct bpf_cpu_map_entry *rcpu)
-{
-       if (atomic_dec_and_test(&rcpu->refcnt)) {
-               if (rcpu->prog)
-                       bpf_prog_put(rcpu->prog);
-               /* The queue should be empty at this point */
-               __cpu_map_ring_cleanup(rcpu->queue);
-               ptr_ring_cleanup(rcpu->queue, NULL);
-               kfree(rcpu->queue);
-               kfree(rcpu);
-       }
-}
-
-/* called from workqueue, to workaround syscall using preempt_disable */
-static void cpu_map_kthread_stop(struct work_struct *work)
-{
-       struct bpf_cpu_map_entry *rcpu;
-
-       rcpu = container_of(work, struct bpf_cpu_map_entry, kthread_stop_wq);
-
-       /* Wait for flush in __cpu_map_entry_free(), via full RCU barrier,
-        * as it waits until all in-flight call_rcu() callbacks complete.
-        */
-       rcu_barrier();
-
-       /* kthread_stop will wake_up_process and wait for it to complete */
-       kthread_stop(rcpu->kthread);
-}
-
 static void cpu_map_bpf_prog_run_skb(struct bpf_cpu_map_entry *rcpu,
                                     struct list_head *listp,
                                     struct xdp_cpumap_stats *stats)
@@ -395,7 +358,6 @@ static int cpu_map_kthread_run(void *data)
        }
        __set_current_state(TASK_RUNNING);
 
-       put_cpu_map_entry(rcpu);
        return 0;
 }
 
@@ -472,9 +434,6 @@ __cpu_map_entry_alloc(struct bpf_map *map, struct bpf_cpumap_val *value,
        if (IS_ERR(rcpu->kthread))
                goto free_prog;
 
-       get_cpu_map_entry(rcpu); /* 1-refcnt for being in cmap->cpu_map[] */
-       get_cpu_map_entry(rcpu); /* 1-refcnt for kthread */
-
        /* Make sure kthread runs on a single CPU */
        kthread_bind(rcpu->kthread, cpu);
        wake_up_process(rcpu->kthread);
@@ -501,40 +460,40 @@ free_rcu:
        return NULL;
 }
 
-static void __cpu_map_entry_free(struct rcu_head *rcu)
+static void __cpu_map_entry_free(struct work_struct *work)
 {
        struct bpf_cpu_map_entry *rcpu;
 
        /* This cpu_map_entry have been disconnected from map and one
-        * RCU grace-period have elapsed.  Thus, XDP cannot queue any
+        * RCU grace-period have elapsed. Thus, XDP cannot queue any
         * new packets and cannot change/set flush_needed that can
         * find this entry.
         */
-       rcpu = container_of(rcu, struct bpf_cpu_map_entry, rcu);
+       rcpu = container_of(to_rcu_work(work), struct bpf_cpu_map_entry, free_work);
+
+       /* kthread_stop will wake_up_process and wait for it to complete.
+        * cpu_map_kthread_run() makes sure the pointer ring is empty
+        * before exiting.
+        */
+       kthread_stop(rcpu->kthread);
 
+       if (rcpu->prog)
+               bpf_prog_put(rcpu->prog);
+       /* The queue should be empty at this point */
+       __cpu_map_ring_cleanup(rcpu->queue);
+       ptr_ring_cleanup(rcpu->queue, NULL);
+       kfree(rcpu->queue);
        free_percpu(rcpu->bulkq);
-       /* Cannot kthread_stop() here, last put free rcpu resources */
-       put_cpu_map_entry(rcpu);
+       kfree(rcpu);
 }
 
-/* After xchg pointer to bpf_cpu_map_entry, use the call_rcu() to
- * ensure any driver rcu critical sections have completed, but this
- * does not guarantee a flush has happened yet. Because driver side
- * rcu_read_lock/unlock only protects the running XDP program.  The
- * atomic xchg and NULL-ptr check in __cpu_map_flush() makes sure a
- * pending flush op doesn't fail.
- *
- * The bpf_cpu_map_entry is still used by the kthread, and there can
- * still be pending packets (in queue and percpu bulkq).  A refcnt
- * makes sure to last user (kthread_stop vs. call_rcu) free memory
- * resources.
- *
- * The rcu callback __cpu_map_entry_free flush remaining packets in
- * percpu bulkq to queue.  Due to caller map_delete_elem() disable
- * preemption, cannot call kthread_stop() to make sure queue is empty.
- * Instead a work_queue is started for stopping kthread,
- * cpu_map_kthread_stop, which waits for an RCU grace period before
- * stopping kthread, emptying the queue.
+/* After the xchg of the bpf_cpu_map_entry pointer, we need to make sure the old
+ * entry is no longer in use before freeing. We use queue_rcu_work() to call
+ * __cpu_map_entry_free() in a separate workqueue after waiting for an RCU grace
+ * period. This means that (a) all pending enqueue and flush operations have
+ * completed (because of the RCU callback), and (b) we are in a workqueue
+ * context where we can stop the kthread and wait for it to exit before freeing
+ * everything.
  */
 static void __cpu_map_entry_replace(struct bpf_cpu_map *cmap,
                                    u32 key_cpu, struct bpf_cpu_map_entry *rcpu)
@@ -543,9 +502,8 @@ static void __cpu_map_entry_replace(struct bpf_cpu_map *cmap,
 
        old_rcpu = unrcu_pointer(xchg(&cmap->cpu_map[key_cpu], RCU_INITIALIZER(rcpu)));
        if (old_rcpu) {
-               call_rcu(&old_rcpu->rcu, __cpu_map_entry_free);
-               INIT_WORK(&old_rcpu->kthread_stop_wq, cpu_map_kthread_stop);
-               schedule_work(&old_rcpu->kthread_stop_wq);
+               INIT_RCU_WORK(&old_rcpu->free_work, __cpu_map_entry_free);
+               queue_rcu_work(system_wq, &old_rcpu->free_work);
        }
 }
 
@@ -557,7 +515,7 @@ static long cpu_map_delete_elem(struct bpf_map *map, void *key)
        if (key_cpu >= map->max_entries)
                return -EINVAL;
 
-       /* notice caller map_delete_elem() use preempt_disable() */
+       /* notice caller map_delete_elem() uses rcu_read_lock() */
        __cpu_map_entry_replace(cmap, key_cpu, NULL);
        return 0;
 }