rpma: update RPMA engines with new librpma completions API
authorOksana Salyk <oksana.salyk@intel.com>
Fri, 4 Feb 2022 19:00:36 +0000 (14:00 -0500)
committerLukasz Dorau <lukasz.dorau@intel.com>
Fri, 18 Feb 2022 13:59:52 +0000 (14:59 +0100)
The API of librpma has been changed between v0.10.0 and v0.12.0
and fio has to be updated.

Signed-off-by: Oksana Salyk <oksana.salyk@intel.com>
engines/librpma_apm.c
engines/librpma_fio.c
engines/librpma_fio.h
engines/librpma_gpspm.c

index ffa3769d337a1cb571b65dfa5ded416c98ae15b6..d1166ad839f3f1bf326557d7a5d140e01ea56905 100644 (file)
@@ -22,8 +22,7 @@ static inline int client_io_flush(struct thread_data *td,
                struct io_u *first_io_u, struct io_u *last_io_u,
                unsigned long long int len);
 
-static int client_get_io_u_index(struct rpma_completion *cmpl,
-               unsigned int *io_u_index);
+static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index);
 
 static int client_init(struct thread_data *td)
 {
@@ -188,10 +187,9 @@ static inline int client_io_flush(struct thread_data *td,
        return 0;
 }
 
-static int client_get_io_u_index(struct rpma_completion *cmpl,
-               unsigned int *io_u_index)
+static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index)
 {
-       memcpy(io_u_index, &cmpl->op_context, sizeof(*io_u_index));
+       memcpy(io_u_index, &wc->wr_id, sizeof(*io_u_index));
 
        return 1;
 }
index 9d6ebf38ee865130ae9450dc79719dbbecc3f419..dfd8218006c0f0e8d33741b085daf6291c73517e 100644 (file)
@@ -302,6 +302,12 @@ int librpma_fio_client_init(struct thread_data *td,
        if (ccd->conn == NULL)
                goto err_peer_delete;
 
+       /* get the connection's main CQ */
+       if ((ret = rpma_conn_get_cq(ccd->conn, &ccd->cq))) {
+               librpma_td_verror(td, ret, "rpma_conn_get_cq");
+               goto err_conn_delete;
+       }
+
        /* get the connection's private data sent from the server */
        if ((ret = rpma_conn_get_private_data(ccd->conn, &pdata))) {
                librpma_td_verror(td, ret, "rpma_conn_get_private_data");
@@ -455,7 +461,7 @@ static enum fio_q_status client_queue_sync(struct thread_data *td,
                struct io_u *io_u)
 {
        struct librpma_fio_client_data *ccd = td->io_ops_data;
-       struct rpma_completion cmpl;
+       struct ibv_wc wc;
        unsigned io_u_index;
        int ret;
 
@@ -478,31 +484,31 @@ static enum fio_q_status client_queue_sync(struct thread_data *td,
 
        do {
                /* get a completion */
-               ret = rpma_conn_completion_get(ccd->conn, &cmpl);
+               ret = rpma_cq_get_wc(ccd->cq, 1, &wc, NULL);
                if (ret == RPMA_E_NO_COMPLETION) {
                        /* lack of completion is not an error */
                        continue;
                } else if (ret != 0) {
                        /* an error occurred */
-                       librpma_td_verror(td, ret, "rpma_conn_completion_get");
+                       librpma_td_verror(td, ret, "rpma_cq_get_wc");
                        goto err;
                }
 
                /* if io_us has completed with an error */
-               if (cmpl.op_status != IBV_WC_SUCCESS)
+               if (wc.status != IBV_WC_SUCCESS)
                        goto err;
 
-               if (cmpl.op == RPMA_OP_SEND)
+               if (wc.opcode == IBV_WC_SEND)
                        ++ccd->op_send_completed;
                else {
-                       if (cmpl.op == RPMA_OP_RECV)
+                       if (wc.opcode == IBV_WC_RECV)
                                ++ccd->op_recv_completed;
 
                        break;
                }
        } while (1);
 
-       if (ccd->get_io_u_index(&cmpl, &io_u_index) != 1)
+       if (ccd->get_io_u_index(&wc, &io_u_index) != 1)
                goto err;
 
        if (io_u->index != io_u_index) {
@@ -654,8 +660,8 @@ int librpma_fio_client_commit(struct thread_data *td)
 static int client_getevent_process(struct thread_data *td)
 {
        struct librpma_fio_client_data *ccd = td->io_ops_data;
-       struct rpma_completion cmpl;
-       /* io_u->index of completed io_u (cmpl.op_context) */
+       struct ibv_wc wc;
+       /* io_u->index of completed io_u (wc.wr_id) */
        unsigned int io_u_index;
        /* # of completed io_us */
        int cmpl_num = 0;
@@ -665,7 +671,7 @@ static int client_getevent_process(struct thread_data *td)
        int ret;
 
        /* get a completion */
-       if ((ret = rpma_conn_completion_get(ccd->conn, &cmpl))) {
+       if ((ret = rpma_cq_get_wc(ccd->cq, 1, &wc, NULL))) {
                /* lack of completion is not an error */
                if (ret == RPMA_E_NO_COMPLETION) {
                        /* lack of completion is not an error */
@@ -673,22 +679,22 @@ static int client_getevent_process(struct thread_data *td)
                }
 
                /* an error occurred */
-               librpma_td_verror(td, ret, "rpma_conn_completion_get");
+               librpma_td_verror(td, ret, "rpma_cq_get_wc");
                return -1;
        }
 
        /* if io_us has completed with an error */
-       if (cmpl.op_status != IBV_WC_SUCCESS) {
-               td->error = cmpl.op_status;
+       if (wc.status != IBV_WC_SUCCESS) {
+               td->error = wc.status;
                return -1;
        }
 
-       if (cmpl.op == RPMA_OP_SEND)
+       if (wc.opcode == IBV_WC_SEND)
                ++ccd->op_send_completed;
-       else if (cmpl.op == RPMA_OP_RECV)
+       else if (wc.opcode == IBV_WC_RECV)
                ++ccd->op_recv_completed;
 
-       if ((ret = ccd->get_io_u_index(&cmpl, &io_u_index)) != 1)
+       if ((ret = ccd->get_io_u_index(&wc, &io_u_index)) != 1)
                return ret;
 
        /* look for an io_u being completed */
@@ -750,7 +756,7 @@ int librpma_fio_client_getevents(struct thread_data *td, unsigned int min,
 
                        /*
                         * To reduce CPU consumption one can use
-                        * the rpma_conn_completion_wait() function.
+                        * the rpma_cq_wait() function.
                         * Note this greatly increase the latency
                         * and make the results less stable.
                         * The bandwidth stays more or less the same.
@@ -1029,6 +1035,12 @@ int librpma_fio_server_open_file(struct thread_data *td, struct fio_file *f,
        csd->ws_ptr = ws_ptr;
        csd->conn = conn;
 
+       /* get the connection's main CQ */
+       if ((ret = rpma_conn_get_cq(csd->conn, &csd->cq))) {
+               librpma_td_verror(td, ret, "rpma_conn_get_cq");
+               goto err_conn_delete;
+       }
+
        return 0;
 
 err_conn_delete:
index 2c507e9c5c1c74290a61d10ba04b30d1e3a01d62..912902357de1cc64b6fa324fb9b7b30684ca9856 100644 (file)
@@ -94,12 +94,13 @@ typedef int (*librpma_fio_flush_t)(struct thread_data *td,
  * - ( 0) - skip
  * - (-1) - on error
  */
-typedef int (*librpma_fio_get_io_u_index_t)(struct rpma_completion *cmpl,
+typedef int (*librpma_fio_get_io_u_index_t)(struct ibv_wc *wc,
                unsigned int *io_u_index);
 
 struct librpma_fio_client_data {
        struct rpma_peer *peer;
        struct rpma_conn *conn;
+       struct rpma_cq *cq;
 
        /* aligned td->orig_buffer */
        char *orig_buffer_aligned;
@@ -199,29 +200,29 @@ static inline int librpma_fio_client_io_complete_all_sends(
                struct thread_data *td)
 {
        struct librpma_fio_client_data *ccd = td->io_ops_data;
-       struct rpma_completion cmpl;
+       struct ibv_wc wc;
        int ret;
 
        while (ccd->op_send_posted != ccd->op_send_completed) {
                /* get a completion */
-               ret = rpma_conn_completion_get(ccd->conn, &cmpl);
+               ret = rpma_cq_get_wc(ccd->cq, 1, &wc, NULL);
                if (ret == RPMA_E_NO_COMPLETION) {
                        /* lack of completion is not an error */
                        continue;
                } else if (ret != 0) {
                        /* an error occurred */
-                       librpma_td_verror(td, ret, "rpma_conn_completion_get");
+                       librpma_td_verror(td, ret, "rpma_cq_get_wc");
                        break;
                }
 
-               if (cmpl.op_status != IBV_WC_SUCCESS)
+               if (wc.status != IBV_WC_SUCCESS)
                        return -1;
 
-               if (cmpl.op == RPMA_OP_SEND)
+               if (wc.opcode == IBV_WC_SEND)
                        ++ccd->op_send_completed;
                else {
                        log_err(
-                               "A completion other than RPMA_OP_SEND got during cleaning up the CQ from SENDs\n");
+                               "A completion other than IBV_WC_SEND got during cleaning up the CQ from SENDs\n");
                        return -1;
                }
        }
@@ -251,6 +252,7 @@ struct librpma_fio_server_data {
 
        /* resources of an incoming connection */
        struct rpma_conn *conn;
+       struct rpma_cq *cq;
 
        char *ws_ptr;
        struct rpma_mr_local *ws_mr;
index 7414770971f9d602d521517deb1156ee775bc437..14626e7fce564a3553ed708b072a25cbf5456c06 100644 (file)
@@ -60,8 +60,7 @@ static inline int client_io_flush(struct thread_data *td,
                struct io_u *first_io_u, struct io_u *last_io_u,
                unsigned long long int len);
 
-static int client_get_io_u_index(struct rpma_completion *cmpl,
-               unsigned int *io_u_index);
+static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index);
 
 static int client_init(struct thread_data *td)
 {
@@ -317,17 +316,16 @@ static inline int client_io_flush(struct thread_data *td,
        return 0;
 }
 
-static int client_get_io_u_index(struct rpma_completion *cmpl,
-               unsigned int *io_u_index)
+static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index)
 {
        GPSPMFlushResponse *flush_resp;
 
-       if (cmpl->op != RPMA_OP_RECV)
+       if (wc->opcode != IBV_WC_RECV)
                return 0;
 
        /* unpack a response from the received buffer */
        flush_resp = gpspm_flush_response__unpack(NULL,
-                       cmpl->byte_len, cmpl->op_context);
+                       wc->byte_len, (void *)wc->wr_id);
        if (flush_resp == NULL) {
                log_err("Cannot unpack the flush response buffer\n");
                return -1;
@@ -373,7 +371,7 @@ struct server_data {
        uint32_t msg_sqe_available; /* # of free SQ slots */
 
        /* in-memory queues */
-       struct rpma_completion *msgs_queued;
+       struct ibv_wc *msgs_queued;
        uint32_t msg_queued_nr;
 };
 
@@ -562,8 +560,7 @@ err_cfg_delete:
        return ret;
 }
 
-static int server_qe_process(struct thread_data *td,
-               struct rpma_completion *cmpl)
+static int server_qe_process(struct thread_data *td, struct ibv_wc *wc)
 {
        struct librpma_fio_server_data *csd = td->io_ops_data;
        struct server_data *sd = csd->server_data;
@@ -580,7 +577,7 @@ static int server_qe_process(struct thread_data *td,
        int ret;
 
        /* calculate SEND/RECV pair parameters */
-       msg_index = (int)(uintptr_t)cmpl->op_context;
+       msg_index = (int)(uintptr_t)wc->wr_id;
        io_u_buff_offset = IO_U_BUFF_OFF_SERVER(msg_index);
        send_buff_offset = io_u_buff_offset + SEND_OFFSET;
        recv_buff_offset = io_u_buff_offset + RECV_OFFSET;
@@ -588,7 +585,7 @@ static int server_qe_process(struct thread_data *td,
        recv_buff_ptr = sd->orig_buffer_aligned + recv_buff_offset;
 
        /* unpack a flush request from the received buffer */
-       flush_req = gpspm_flush_request__unpack(NULL, cmpl->byte_len,
+       flush_req = gpspm_flush_request__unpack(NULL, wc->byte_len,
                        recv_buff_ptr);
        if (flush_req == NULL) {
                log_err("cannot unpack the flush request buffer\n");
@@ -682,28 +679,28 @@ static int server_cmpl_process(struct thread_data *td)
 {
        struct librpma_fio_server_data *csd = td->io_ops_data;
        struct server_data *sd = csd->server_data;
-       struct rpma_completion *cmpl = &sd->msgs_queued[sd->msg_queued_nr];
+       struct ibv_wc *wc = &sd->msgs_queued[sd->msg_queued_nr];
        struct librpma_fio_options_values *o = td->eo;
        int ret;
 
-       ret = rpma_conn_completion_get(csd->conn, cmpl);
+       ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL);
        if (ret == RPMA_E_NO_COMPLETION) {
                if (o->busy_wait_polling == 0) {
-                       ret = rpma_conn_completion_wait(csd->conn);
+                       ret = rpma_cq_wait(csd->cq);
                        if (ret == RPMA_E_NO_COMPLETION) {
                                /* lack of completion is not an error */
                                return 0;
                        } else if (ret != 0) {
-                               librpma_td_verror(td, ret, "rpma_conn_completion_wait");
+                               librpma_td_verror(td, ret, "rpma_cq_wait");
                                goto err_terminate;
                        }
 
-                       ret = rpma_conn_completion_get(csd->conn, cmpl);
+                       ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL);
                        if (ret == RPMA_E_NO_COMPLETION) {
                                /* lack of completion is not an error */
                                return 0;
                        } else if (ret != 0) {
-                               librpma_td_verror(td, ret, "rpma_conn_completion_get");
+                               librpma_td_verror(td, ret, "rpma_cq_get_wc");
                                goto err_terminate;
                        }
                } else {
@@ -711,17 +708,17 @@ static int server_cmpl_process(struct thread_data *td)
                        return 0;
                }
        } else if (ret != 0) {
-               librpma_td_verror(td, ret, "rpma_conn_completion_get");
+               librpma_td_verror(td, ret, "rpma_cq_get_wc");
                goto err_terminate;
        }
 
        /* validate the completion */
-       if (cmpl->op_status != IBV_WC_SUCCESS)
+       if (wc->status != IBV_WC_SUCCESS)
                goto err_terminate;
 
-       if (cmpl->op == RPMA_OP_RECV)
+       if (wc->opcode == IBV_WC_RECV)
                ++sd->msg_queued_nr;
-       else if (cmpl->op == RPMA_OP_SEND)
+       else if (wc->opcode == IBV_WC_SEND)
                ++sd->msg_sqe_available;
 
        return 0;