SUNRPC: Add a server side per-connection limit
authorTrond Myklebust <trond.myklebust@primarydata.com>
Fri, 24 Jun 2016 14:55:50 +0000 (10:55 -0400)
committerJ. Bruce Fields <bfields@redhat.com>
Wed, 13 Jul 2016 19:53:48 +0000 (15:53 -0400)
Allow the user to limit the number of requests serviced through a single
connection, to help prevent faster clients from starving slower clients.

Signed-off-by: Trond Myklebust <trond.myklebust@primarydata.com>
Signed-off-by: J. Bruce Fields <bfields@redhat.com>
Documentation/kernel-parameters.txt
include/linux/sunrpc/svc.h
include/linux/sunrpc/svc_xprt.h
net/sunrpc/svc_xprt.c

index 82b42c958d1c7def4eac5c9931a0e8a6f9aab6c6..48ba6d2e670a4c3d1c2d4e5644a29a2ae673a733 100644 (file)
@@ -3832,6 +3832,12 @@ bytes respectively. Such letter suffixes can also be entirely omitted.
                        using these two parameters to set the minimum and
                        maximum port values.
 
+       sunrpc.svc_rpc_per_connection_limit=
+                       [NFS,SUNRPC]
+                       Limit the number of requests that the server will
+                       process in parallel from a single connection.
+                       The default value is 0 (no limit).
+
        sunrpc.pool_mode=
                        [NFS]
                        Control how the NFS server code allocates CPUs to
index 7ca44fb5b675d1c078a2a2f121056606282dad58..7321ae933867566013a250623564d722d2800305 100644 (file)
@@ -268,6 +268,7 @@ struct svc_rqst {
                                                 * cache pages */
 #define        RQ_VICTIM       (5)                     /* about to be shut down */
 #define        RQ_BUSY         (6)                     /* request is busy */
+#define        RQ_DATA         (7)                     /* request has data */
        unsigned long           rq_flags;       /* flags field */
 
        void *                  rq_argp;        /* decoded arguments */
index 79ba50856707b9b9a619be78335b27b383312ed9..ad899ffed3beed8ed53b0d735920f5ef142e5db4 100644 (file)
@@ -69,6 +69,7 @@ struct svc_xprt {
 
        struct svc_serv         *xpt_server;    /* service for transport */
        atomic_t                xpt_reserved;   /* space on outq that is rsvd */
+       atomic_t                xpt_nr_rqsts;   /* Number of requests */
        struct mutex            xpt_mutex;      /* to serialize sending data */
        spinlock_t              xpt_lock;       /* protects sk_deferred
                                                 * and xpt_auth_cache */
index e7082a4aeb56e4e7003d96ee38775ec2acca48c7..2adc8db6aaf5224b8d384cb34066d314ccf156e1 100644 (file)
 
 #define RPCDBG_FACILITY        RPCDBG_SVCXPRT
 
+static unsigned int svc_rpc_per_connection_limit __read_mostly;
+module_param(svc_rpc_per_connection_limit, uint, 0644);
+
+
 static struct svc_deferred_req *svc_deferred_dequeue(struct svc_xprt *xprt);
 static int svc_deferred_recv(struct svc_rqst *rqstp);
 static struct cache_deferred_req *svc_defer(struct cache_req *req);
@@ -329,12 +333,41 @@ char *svc_print_addr(struct svc_rqst *rqstp, char *buf, size_t len)
 }
 EXPORT_SYMBOL_GPL(svc_print_addr);
 
+static bool svc_xprt_slots_in_range(struct svc_xprt *xprt)
+{
+       unsigned int limit = svc_rpc_per_connection_limit;
+       int nrqsts = atomic_read(&xprt->xpt_nr_rqsts);
+
+       return limit == 0 || (nrqsts >= 0 && nrqsts < limit);
+}
+
+static bool svc_xprt_reserve_slot(struct svc_rqst *rqstp, struct svc_xprt *xprt)
+{
+       if (!test_bit(RQ_DATA, &rqstp->rq_flags)) {
+               if (!svc_xprt_slots_in_range(xprt))
+                       return false;
+               atomic_inc(&xprt->xpt_nr_rqsts);
+               set_bit(RQ_DATA, &rqstp->rq_flags);
+       }
+       return true;
+}
+
+static void svc_xprt_release_slot(struct svc_rqst *rqstp)
+{
+       struct svc_xprt *xprt = rqstp->rq_xprt;
+       if (test_and_clear_bit(RQ_DATA, &rqstp->rq_flags)) {
+               atomic_dec(&xprt->xpt_nr_rqsts);
+               svc_xprt_enqueue(xprt);
+       }
+}
+
 static bool svc_xprt_has_something_to_do(struct svc_xprt *xprt)
 {
        if (xprt->xpt_flags & ((1<<XPT_CONN)|(1<<XPT_CLOSE)))
                return true;
        if (xprt->xpt_flags & ((1<<XPT_DATA)|(1<<XPT_DEFERRED))) {
-               if (xprt->xpt_ops->xpo_has_wspace(xprt))
+               if (xprt->xpt_ops->xpo_has_wspace(xprt) &&
+                   svc_xprt_slots_in_range(xprt))
                        return true;
                trace_svc_xprt_no_write_space(xprt);
                return false;
@@ -516,8 +549,8 @@ static void svc_xprt_release(struct svc_rqst *rqstp)
 
        rqstp->rq_res.head[0].iov_len = 0;
        svc_reserve(rqstp, 0);
+       svc_xprt_release_slot(rqstp);
        rqstp->rq_xprt = NULL;
-
        svc_xprt_put(xprt);
 }
 
@@ -785,7 +818,7 @@ static int svc_handle_xprt(struct svc_rqst *rqstp, struct svc_xprt *xprt)
                        svc_add_new_temp_xprt(serv, newxpt);
                else
                        module_put(xprt->xpt_class->xcl_owner);
-       } else {
+       } else if (svc_xprt_reserve_slot(rqstp, xprt)) {
                /* XPT_DATA|XPT_DEFERRED case: */
                dprintk("svc: server %p, pool %u, transport %p, inuse=%d\n",
                        rqstp, rqstp->rq_pool->sp_id, xprt,