t/nvmept_trim: increase transfer size for some tests
[fio.git] / flow.c
diff --git a/flow.c b/flow.c
index a8dbfb9bb48e66a991953810d2c25d92eeeacb54..c64bb3b27559053d27a79fa2c9a45f68de6e71c2 100644 (file)
--- a/flow.c
+++ b/flow.c
@@ -5,9 +5,10 @@
 
 struct fio_flow {
        unsigned int refs;
-       struct flist_head list;
        unsigned int id;
-       long long int flow_counter;
+       struct flist_head list;
+       unsigned long flow_counter;
+       unsigned int total_weight;
 };
 
 static struct flist_head *flow_list;
@@ -16,28 +17,40 @@ static struct fio_sem *flow_lock;
 int flow_threshold_exceeded(struct thread_data *td)
 {
        struct fio_flow *flow = td->flow;
-       long long flow_counter;
+       double flow_counter_ratio, flow_weight_ratio;
 
        if (!flow)
                return 0;
 
-       if (td->o.flow > 0)
-               flow_counter = flow->flow_counter;
-       else
-               flow_counter = -flow->flow_counter;
-
-       if (flow_counter > td->o.flow_watermark) {
+       flow_counter_ratio = (double)td->flow_counter /
+               atomic_load_relaxed(&flow->flow_counter);
+       flow_weight_ratio = (double)td->o.flow /
+               atomic_load_relaxed(&flow->total_weight);
+
+       /*
+        * each thread/process executing a fio job will stall based on the
+        * expected  user ratio for a given flow_id group. the idea is to keep
+        * 2 counters, flow and job-specific counter to test if the
+        * ratio between them is proportional to other jobs in the same flow_id
+        */
+       if (flow_counter_ratio > flow_weight_ratio) {
                if (td->o.flow_sleep) {
                        io_u_quiesce(td);
                        usleep(td->o.flow_sleep);
+               } else if (td->o.zone_mode == ZONE_MODE_ZBD) {
+                       io_u_quiesce(td);
                }
 
                return 1;
        }
 
-       /* No synchronization needed because it doesn't
-        * matter if the flow count is slightly inaccurate */
-       flow->flow_counter += td->o.flow;
+       /*
+        * increment flow(shared counter, therefore atomically)
+        * and job-specific counter
+        */
+       atomic_add(&flow->flow_counter, 1);
+       ++td->flow_counter;
+
        return 0;
 }
 
@@ -68,7 +81,8 @@ static struct fio_flow *flow_get(unsigned int id)
                flow->refs = 0;
                INIT_FLIST_HEAD(&flow->list);
                flow->id = id;
-               flow->flow_counter = 0;
+               flow->flow_counter = 1;
+               flow->total_weight = 0;
 
                flist_add_tail(&flow->list, flow_list);
        }
@@ -78,14 +92,19 @@ static struct fio_flow *flow_get(unsigned int id)
        return flow;
 }
 
-static void flow_put(struct fio_flow *flow)
+static void flow_put(struct fio_flow *flow, unsigned long flow_counter,
+                                       unsigned int weight)
 {
        if (!flow_lock)
                return;
 
        fio_sem_down(flow_lock);
 
+       atomic_sub(&flow->flow_counter, flow_counter);
+       atomic_sub(&flow->total_weight, weight);
+
        if (!--flow->refs) {
+               assert(flow->flow_counter == 1);
                flist_del(&flow->list);
                sfree(flow);
        }
@@ -95,14 +114,17 @@ static void flow_put(struct fio_flow *flow)
 
 void flow_init_job(struct thread_data *td)
 {
-       if (td->o.flow)
+       if (td->o.flow) {
                td->flow = flow_get(td->o.flow_id);
+               td->flow_counter = 0;
+               atomic_add(&td->flow->total_weight, td->o.flow);
+       }
 }
 
 void flow_exit_job(struct thread_data *td)
 {
        if (td->flow) {
-               flow_put(td->flow);
+               flow_put(td->flow, td->flow_counter, td->o.flow);
                td->flow = NULL;
        }
 }