nvmet-rdma: use SRQ per completion vector
authorMax Gurtovoy <maxg@mellanox.com>
Wed, 19 Apr 2017 08:56:57 +0000 (11:56 +0300)
committerJens Axboe <axboe@kernel.dk>
Sat, 9 May 2020 22:18:35 +0000 (16:18 -0600)
In order to save resource allocation and utilize the completion
locality in a better way (compared to SRQ per device that exist today),
allocate Shared Receive Queues (SRQs) per completion vector. Associate
each created QP/CQ with an appropriate SRQ according to the queue index.
This association will reduce the lock contention in the fast path
(compared to SRQ per device solution) and increase the locality in
memory buffers. Add new module parameter for SRQ size to adjust it
according to the expected load. User should make sure the size is >= 256
to avoid lack of resources. Also reduce the debug level of "last WQE
reached" event that is raised when a QP is using SRQ during destruction
process to relief the log.

Signed-off-by: Max Gurtovoy <maxg@mellanox.com>
Reviewed-by: Sagi Grimberg <sagi@grimberg.me>
Signed-off-by: Christoph Hellwig <hch@lst.de>
Signed-off-by: Jens Axboe <axboe@kernel.dk>
drivers/nvme/target/rdma.c

index fd47de0e4e4e543eac0aca176e3ffa4d0c5b7fbb..7a90b10359bbda0a6108137afbae43f5a1394496 100644 (file)
@@ -34,6 +34,8 @@
 /* Assume mpsmin == device_page_size == 4KB */
 #define NVMET_RDMA_MAX_MDTS                    8
 
+struct nvmet_rdma_srq;
+
 struct nvmet_rdma_cmd {
        struct ib_sge           sge[NVMET_RDMA_MAX_INLINE_SGE + 1];
        struct ib_cqe           cqe;
@@ -41,6 +43,7 @@ struct nvmet_rdma_cmd {
        struct scatterlist      inline_sg[NVMET_RDMA_MAX_INLINE_SGE];
        struct nvme_command     *nvme_cmd;
        struct nvmet_rdma_queue *queue;
+       struct nvmet_rdma_srq   *nsrq;
 };
 
 enum {
@@ -83,6 +86,7 @@ struct nvmet_rdma_queue {
        struct ib_cq            *cq;
        atomic_t                sq_wr_avail;
        struct nvmet_rdma_device *dev;
+       struct nvmet_rdma_srq   *nsrq;
        spinlock_t              state_lock;
        enum nvmet_rdma_queue_state state;
        struct nvmet_cq         nvme_cq;
@@ -100,6 +104,7 @@ struct nvmet_rdma_queue {
 
        int                     idx;
        int                     host_qid;
+       int                     comp_vector;
        int                     recv_queue_size;
        int                     send_queue_size;
 
@@ -113,11 +118,17 @@ struct nvmet_rdma_port {
        struct delayed_work     repair_work;
 };
 
+struct nvmet_rdma_srq {
+       struct ib_srq            *srq;
+       struct nvmet_rdma_cmd    *cmds;
+       struct nvmet_rdma_device *ndev;
+};
+
 struct nvmet_rdma_device {
        struct ib_device        *device;
        struct ib_pd            *pd;
-       struct ib_srq           *srq;
-       struct nvmet_rdma_cmd   *srq_cmds;
+       struct nvmet_rdma_srq   **srqs;
+       int                     srq_count;
        size_t                  srq_size;
        struct kref             ref;
        struct list_head        entry;
@@ -129,6 +140,16 @@ static bool nvmet_rdma_use_srq;
 module_param_named(use_srq, nvmet_rdma_use_srq, bool, 0444);
 MODULE_PARM_DESC(use_srq, "Use shared receive queue.");
 
+static int srq_size_set(const char *val, const struct kernel_param *kp);
+static const struct kernel_param_ops srq_size_ops = {
+       .set = srq_size_set,
+       .get = param_get_int,
+};
+
+static int nvmet_rdma_srq_size = 1024;
+module_param_cb(srq_size, &srq_size_ops, &nvmet_rdma_srq_size, 0644);
+MODULE_PARM_DESC(srq_size, "set Shared Receive Queue (SRQ) size, should >= 256 (default: 1024)");
+
 static DEFINE_IDA(nvmet_rdma_queue_ida);
 static LIST_HEAD(nvmet_rdma_queue_list);
 static DEFINE_MUTEX(nvmet_rdma_queue_mutex);
@@ -149,6 +170,17 @@ static int nvmet_rdma_alloc_rsp(struct nvmet_rdma_device *ndev,
 
 static const struct nvmet_fabrics_ops nvmet_rdma_ops;
 
+static int srq_size_set(const char *val, const struct kernel_param *kp)
+{
+       int n = 0, ret;
+
+       ret = kstrtoint(val, 10, &n);
+       if (ret != 0 || n < 256)
+               return -EINVAL;
+
+       return param_set_int(val, kp);
+}
+
 static int num_pages(int len)
 {
        return 1 + (((len - 1) & PAGE_MASK) >> PAGE_SHIFT);
@@ -466,8 +498,8 @@ static int nvmet_rdma_post_recv(struct nvmet_rdma_device *ndev,
                cmd->sge[0].addr, cmd->sge[0].length,
                DMA_FROM_DEVICE);
 
-       if (ndev->srq)
-               ret = ib_post_srq_recv(ndev->srq, &cmd->wr, NULL);
+       if (cmd->nsrq)
+               ret = ib_post_srq_recv(cmd->nsrq->srq, &cmd->wr, NULL);
        else
                ret = ib_post_recv(cmd->queue->qp, &cmd->wr, NULL);
 
@@ -845,23 +877,40 @@ static void nvmet_rdma_recv_done(struct ib_cq *cq, struct ib_wc *wc)
        nvmet_rdma_handle_command(queue, rsp);
 }
 
-static void nvmet_rdma_destroy_srq(struct nvmet_rdma_device *ndev)
+static void nvmet_rdma_destroy_srq(struct nvmet_rdma_srq *nsrq)
+{
+       nvmet_rdma_free_cmds(nsrq->ndev, nsrq->cmds, nsrq->ndev->srq_size,
+                            false);
+       ib_destroy_srq(nsrq->srq);
+
+       kfree(nsrq);
+}
+
+static void nvmet_rdma_destroy_srqs(struct nvmet_rdma_device *ndev)
 {
-       if (!ndev->srq)
+       int i;
+
+       if (!ndev->srqs)
                return;
 
-       nvmet_rdma_free_cmds(ndev, ndev->srq_cmds, ndev->srq_size, false);
-       ib_destroy_srq(ndev->srq);
+       for (i = 0; i < ndev->srq_count; i++)
+               nvmet_rdma_destroy_srq(ndev->srqs[i]);
+
+       kfree(ndev->srqs);
 }
 
-static int nvmet_rdma_init_srq(struct nvmet_rdma_device *ndev)
+static struct nvmet_rdma_srq *
+nvmet_rdma_init_srq(struct nvmet_rdma_device *ndev)
 {
        struct ib_srq_init_attr srq_attr = { NULL, };
+       size_t srq_size = ndev->srq_size;
+       struct nvmet_rdma_srq *nsrq;
        struct ib_srq *srq;
-       size_t srq_size;
        int ret, i;
 
-       srq_size = 4095;        /* XXX: tune */
+       nsrq = kzalloc(sizeof(*nsrq), GFP_KERNEL);
+       if (!nsrq)
+               return ERR_PTR(-ENOMEM);
 
        srq_attr.attr.max_wr = srq_size;
        srq_attr.attr.max_sge = 1 + ndev->inline_page_count;
@@ -869,35 +918,73 @@ static int nvmet_rdma_init_srq(struct nvmet_rdma_device *ndev)
        srq_attr.srq_type = IB_SRQT_BASIC;
        srq = ib_create_srq(ndev->pd, &srq_attr);
        if (IS_ERR(srq)) {
-               /*
-                * If SRQs aren't supported we just go ahead and use normal
-                * non-shared receive queues.
-                */
-               pr_info("SRQ requested but not supported.\n");
-               return 0;
+               ret = PTR_ERR(srq);
+               goto out_free;
        }
 
-       ndev->srq_cmds = nvmet_rdma_alloc_cmds(ndev, srq_size, false);
-       if (IS_ERR(ndev->srq_cmds)) {
-               ret = PTR_ERR(ndev->srq_cmds);
+       nsrq->cmds = nvmet_rdma_alloc_cmds(ndev, srq_size, false);
+       if (IS_ERR(nsrq->cmds)) {
+               ret = PTR_ERR(nsrq->cmds);
                goto out_destroy_srq;
        }
 
-       ndev->srq = srq;
-       ndev->srq_size = srq_size;
+       nsrq->srq = srq;
+       nsrq->ndev = ndev;
 
        for (i = 0; i < srq_size; i++) {
-               ret = nvmet_rdma_post_recv(ndev, &ndev->srq_cmds[i]);
+               nsrq->cmds[i].nsrq = nsrq;
+               ret = nvmet_rdma_post_recv(ndev, &nsrq->cmds[i]);
                if (ret)
                        goto out_free_cmds;
        }
 
-       return 0;
+       return nsrq;
 
 out_free_cmds:
-       nvmet_rdma_free_cmds(ndev, ndev->srq_cmds, ndev->srq_size, false);
+       nvmet_rdma_free_cmds(ndev, nsrq->cmds, srq_size, false);
 out_destroy_srq:
        ib_destroy_srq(srq);
+out_free:
+       kfree(nsrq);
+       return ERR_PTR(ret);
+}
+
+static int nvmet_rdma_init_srqs(struct nvmet_rdma_device *ndev)
+{
+       int i, ret;
+
+       if (!ndev->device->attrs.max_srq_wr || !ndev->device->attrs.max_srq) {
+               /*
+                * If SRQs aren't supported we just go ahead and use normal
+                * non-shared receive queues.
+                */
+               pr_info("SRQ requested but not supported.\n");
+               return 0;
+       }
+
+       ndev->srq_size = min(ndev->device->attrs.max_srq_wr,
+                            nvmet_rdma_srq_size);
+       ndev->srq_count = min(ndev->device->num_comp_vectors,
+                             ndev->device->attrs.max_srq);
+
+       ndev->srqs = kcalloc(ndev->srq_count, sizeof(*ndev->srqs), GFP_KERNEL);
+       if (!ndev->srqs)
+               return -ENOMEM;
+
+       for (i = 0; i < ndev->srq_count; i++) {
+               ndev->srqs[i] = nvmet_rdma_init_srq(ndev);
+               if (IS_ERR(ndev->srqs[i])) {
+                       ret = PTR_ERR(ndev->srqs[i]);
+                       goto err_srq;
+               }
+       }
+
+       return 0;
+
+err_srq:
+       while (--i >= 0)
+               nvmet_rdma_destroy_srq(ndev->srqs[i]);
+       kfree(ndev->srqs);
        return ret;
 }
 
@@ -910,7 +997,7 @@ static void nvmet_rdma_free_dev(struct kref *ref)
        list_del(&ndev->entry);
        mutex_unlock(&device_list_mutex);
 
-       nvmet_rdma_destroy_srq(ndev);
+       nvmet_rdma_destroy_srqs(ndev);
        ib_dealloc_pd(ndev->pd);
 
        kfree(ndev);
@@ -957,7 +1044,7 @@ nvmet_rdma_find_get_device(struct rdma_cm_id *cm_id)
                goto out_free_dev;
 
        if (nvmet_rdma_use_srq) {
-               ret = nvmet_rdma_init_srq(ndev);
+               ret = nvmet_rdma_init_srqs(ndev);
                if (ret)
                        goto out_free_pd;
        }
@@ -981,14 +1068,7 @@ static int nvmet_rdma_create_queue_ib(struct nvmet_rdma_queue *queue)
 {
        struct ib_qp_init_attr qp_attr;
        struct nvmet_rdma_device *ndev = queue->dev;
-       int comp_vector, nr_cqe, ret, i, factor;
-
-       /*
-        * Spread the io queues across completion vectors,
-        * but still keep all admin queues on vector 0.
-        */
-       comp_vector = !queue->host_qid ? 0 :
-               queue->idx % ndev->device->num_comp_vectors;
+       int nr_cqe, ret, i, factor;
 
        /*
         * Reserve CQ slots for RECV + RDMA_READ/RDMA_WRITE + RDMA_SEND.
@@ -996,7 +1076,7 @@ static int nvmet_rdma_create_queue_ib(struct nvmet_rdma_queue *queue)
        nr_cqe = queue->recv_queue_size + 2 * queue->send_queue_size;
 
        queue->cq = ib_alloc_cq(ndev->device, queue,
-                       nr_cqe + 1, comp_vector,
+                       nr_cqe + 1, queue->comp_vector,
                        IB_POLL_WORKQUEUE);
        if (IS_ERR(queue->cq)) {
                ret = PTR_ERR(queue->cq);
@@ -1020,8 +1100,8 @@ static int nvmet_rdma_create_queue_ib(struct nvmet_rdma_queue *queue)
        qp_attr.cap.max_send_sge = max(ndev->device->attrs.max_sge_rd,
                                        ndev->device->attrs.max_send_sge);
 
-       if (ndev->srq) {
-               qp_attr.srq = ndev->srq;
+       if (queue->nsrq) {
+               qp_attr.srq = queue->nsrq->srq;
        } else {
                /* +1 for drain */
                qp_attr.cap.max_recv_wr = 1 + queue->recv_queue_size;
@@ -1041,7 +1121,7 @@ static int nvmet_rdma_create_queue_ib(struct nvmet_rdma_queue *queue)
                 __func__, queue->cq->cqe, qp_attr.cap.max_send_sge,
                 qp_attr.cap.max_send_wr, queue->cm_id);
 
-       if (!ndev->srq) {
+       if (!queue->nsrq) {
                for (i = 0; i < queue->recv_queue_size; i++) {
                        queue->cmds[i].queue = queue;
                        ret = nvmet_rdma_post_recv(ndev, &queue->cmds[i]);
@@ -1076,7 +1156,7 @@ static void nvmet_rdma_free_queue(struct nvmet_rdma_queue *queue)
        nvmet_sq_destroy(&queue->nvme_sq);
 
        nvmet_rdma_destroy_queue_ib(queue);
-       if (!queue->dev->srq) {
+       if (!queue->nsrq) {
                nvmet_rdma_free_cmds(queue->dev, queue->cmds,
                                queue->recv_queue_size,
                                !queue->host_qid);
@@ -1188,13 +1268,23 @@ nvmet_rdma_alloc_queue(struct nvmet_rdma_device *ndev,
                goto out_destroy_sq;
        }
 
+       /*
+        * Spread the io queues across completion vectors,
+        * but still keep all admin queues on vector 0.
+        */
+       queue->comp_vector = !queue->host_qid ? 0 :
+               queue->idx % ndev->device->num_comp_vectors;
+
+
        ret = nvmet_rdma_alloc_rsps(queue);
        if (ret) {
                ret = NVME_RDMA_CM_NO_RSC;
                goto out_ida_remove;
        }
 
-       if (!ndev->srq) {
+       if (ndev->srqs) {
+               queue->nsrq = ndev->srqs[queue->comp_vector % ndev->srq_count];
+       } else {
                queue->cmds = nvmet_rdma_alloc_cmds(ndev,
                                queue->recv_queue_size,
                                !queue->host_qid);
@@ -1215,7 +1305,7 @@ nvmet_rdma_alloc_queue(struct nvmet_rdma_device *ndev,
        return queue;
 
 out_free_cmds:
-       if (!ndev->srq) {
+       if (!queue->nsrq) {
                nvmet_rdma_free_cmds(queue->dev, queue->cmds,
                                queue->recv_queue_size,
                                !queue->host_qid);
@@ -1241,6 +1331,10 @@ static void nvmet_rdma_qp_event(struct ib_event *event, void *priv)
        case IB_EVENT_COMM_EST:
                rdma_notify(queue->cm_id, event->event);
                break;
+       case IB_EVENT_QP_LAST_WQE_REACHED:
+               pr_debug("received last WQE reached event for queue=0x%p\n",
+                        queue);
+               break;
        default:
                pr_err("received IB QP event: %s (%d)\n",
                       ib_event_msg(event->event), event->event);