From 4ef7dd21b8a960855aa9d9c1e6417509bbfa05a9 Mon Sep 17 00:00:00 2001 From: Oksana Salyk Date: Fri, 4 Feb 2022 14:00:36 -0500 Subject: [PATCH] rpma: update RPMA engines with new librpma completions API The API of librpma has been changed between v0.10.0 and v0.12.0 and fio has to be updated. Signed-off-by: Oksana Salyk --- engines/librpma_apm.c | 8 +++---- engines/librpma_fio.c | 46 ++++++++++++++++++++++++++--------------- engines/librpma_fio.h | 16 +++++++------- engines/librpma_gpspm.c | 39 ++++++++++++++++------------------ 4 files changed, 59 insertions(+), 50 deletions(-) diff --git a/engines/librpma_apm.c b/engines/librpma_apm.c index ffa3769d..d1166ad8 100644 --- a/engines/librpma_apm.c +++ b/engines/librpma_apm.c @@ -22,8 +22,7 @@ static inline int client_io_flush(struct thread_data *td, struct io_u *first_io_u, struct io_u *last_io_u, unsigned long long int len); -static int client_get_io_u_index(struct rpma_completion *cmpl, - unsigned int *io_u_index); +static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index); static int client_init(struct thread_data *td) { @@ -188,10 +187,9 @@ static inline int client_io_flush(struct thread_data *td, return 0; } -static int client_get_io_u_index(struct rpma_completion *cmpl, - unsigned int *io_u_index) +static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index) { - memcpy(io_u_index, &cmpl->op_context, sizeof(*io_u_index)); + memcpy(io_u_index, &wc->wr_id, sizeof(*io_u_index)); return 1; } diff --git a/engines/librpma_fio.c b/engines/librpma_fio.c index 9d6ebf38..dfd82180 100644 --- a/engines/librpma_fio.c +++ b/engines/librpma_fio.c @@ -302,6 +302,12 @@ int librpma_fio_client_init(struct thread_data *td, if (ccd->conn == NULL) goto err_peer_delete; + /* get the connection's main CQ */ + if ((ret = rpma_conn_get_cq(ccd->conn, &ccd->cq))) { + librpma_td_verror(td, ret, "rpma_conn_get_cq"); + goto err_conn_delete; + } + /* get the connection's private data sent from the server */ if ((ret = rpma_conn_get_private_data(ccd->conn, &pdata))) { librpma_td_verror(td, ret, "rpma_conn_get_private_data"); @@ -455,7 +461,7 @@ static enum fio_q_status client_queue_sync(struct thread_data *td, struct io_u *io_u) { struct librpma_fio_client_data *ccd = td->io_ops_data; - struct rpma_completion cmpl; + struct ibv_wc wc; unsigned io_u_index; int ret; @@ -478,31 +484,31 @@ static enum fio_q_status client_queue_sync(struct thread_data *td, do { /* get a completion */ - ret = rpma_conn_completion_get(ccd->conn, &cmpl); + ret = rpma_cq_get_wc(ccd->cq, 1, &wc, NULL); if (ret == RPMA_E_NO_COMPLETION) { /* lack of completion is not an error */ continue; } else if (ret != 0) { /* an error occurred */ - librpma_td_verror(td, ret, "rpma_conn_completion_get"); + librpma_td_verror(td, ret, "rpma_cq_get_wc"); goto err; } /* if io_us has completed with an error */ - if (cmpl.op_status != IBV_WC_SUCCESS) + if (wc.status != IBV_WC_SUCCESS) goto err; - if (cmpl.op == RPMA_OP_SEND) + if (wc.opcode == IBV_WC_SEND) ++ccd->op_send_completed; else { - if (cmpl.op == RPMA_OP_RECV) + if (wc.opcode == IBV_WC_RECV) ++ccd->op_recv_completed; break; } } while (1); - if (ccd->get_io_u_index(&cmpl, &io_u_index) != 1) + if (ccd->get_io_u_index(&wc, &io_u_index) != 1) goto err; if (io_u->index != io_u_index) { @@ -654,8 +660,8 @@ int librpma_fio_client_commit(struct thread_data *td) static int client_getevent_process(struct thread_data *td) { struct librpma_fio_client_data *ccd = td->io_ops_data; - struct rpma_completion cmpl; - /* io_u->index of completed io_u (cmpl.op_context) */ + struct ibv_wc wc; + /* io_u->index of completed io_u (wc.wr_id) */ unsigned int io_u_index; /* # of completed io_us */ int cmpl_num = 0; @@ -665,7 +671,7 @@ static int client_getevent_process(struct thread_data *td) int ret; /* get a completion */ - if ((ret = rpma_conn_completion_get(ccd->conn, &cmpl))) { + if ((ret = rpma_cq_get_wc(ccd->cq, 1, &wc, NULL))) { /* lack of completion is not an error */ if (ret == RPMA_E_NO_COMPLETION) { /* lack of completion is not an error */ @@ -673,22 +679,22 @@ static int client_getevent_process(struct thread_data *td) } /* an error occurred */ - librpma_td_verror(td, ret, "rpma_conn_completion_get"); + librpma_td_verror(td, ret, "rpma_cq_get_wc"); return -1; } /* if io_us has completed with an error */ - if (cmpl.op_status != IBV_WC_SUCCESS) { - td->error = cmpl.op_status; + if (wc.status != IBV_WC_SUCCESS) { + td->error = wc.status; return -1; } - if (cmpl.op == RPMA_OP_SEND) + if (wc.opcode == IBV_WC_SEND) ++ccd->op_send_completed; - else if (cmpl.op == RPMA_OP_RECV) + else if (wc.opcode == IBV_WC_RECV) ++ccd->op_recv_completed; - if ((ret = ccd->get_io_u_index(&cmpl, &io_u_index)) != 1) + if ((ret = ccd->get_io_u_index(&wc, &io_u_index)) != 1) return ret; /* look for an io_u being completed */ @@ -750,7 +756,7 @@ int librpma_fio_client_getevents(struct thread_data *td, unsigned int min, /* * To reduce CPU consumption one can use - * the rpma_conn_completion_wait() function. + * the rpma_cq_wait() function. * Note this greatly increase the latency * and make the results less stable. * The bandwidth stays more or less the same. @@ -1029,6 +1035,12 @@ int librpma_fio_server_open_file(struct thread_data *td, struct fio_file *f, csd->ws_ptr = ws_ptr; csd->conn = conn; + /* get the connection's main CQ */ + if ((ret = rpma_conn_get_cq(csd->conn, &csd->cq))) { + librpma_td_verror(td, ret, "rpma_conn_get_cq"); + goto err_conn_delete; + } + return 0; err_conn_delete: diff --git a/engines/librpma_fio.h b/engines/librpma_fio.h index 2c507e9c..91290235 100644 --- a/engines/librpma_fio.h +++ b/engines/librpma_fio.h @@ -94,12 +94,13 @@ typedef int (*librpma_fio_flush_t)(struct thread_data *td, * - ( 0) - skip * - (-1) - on error */ -typedef int (*librpma_fio_get_io_u_index_t)(struct rpma_completion *cmpl, +typedef int (*librpma_fio_get_io_u_index_t)(struct ibv_wc *wc, unsigned int *io_u_index); struct librpma_fio_client_data { struct rpma_peer *peer; struct rpma_conn *conn; + struct rpma_cq *cq; /* aligned td->orig_buffer */ char *orig_buffer_aligned; @@ -199,29 +200,29 @@ static inline int librpma_fio_client_io_complete_all_sends( struct thread_data *td) { struct librpma_fio_client_data *ccd = td->io_ops_data; - struct rpma_completion cmpl; + struct ibv_wc wc; int ret; while (ccd->op_send_posted != ccd->op_send_completed) { /* get a completion */ - ret = rpma_conn_completion_get(ccd->conn, &cmpl); + ret = rpma_cq_get_wc(ccd->cq, 1, &wc, NULL); if (ret == RPMA_E_NO_COMPLETION) { /* lack of completion is not an error */ continue; } else if (ret != 0) { /* an error occurred */ - librpma_td_verror(td, ret, "rpma_conn_completion_get"); + librpma_td_verror(td, ret, "rpma_cq_get_wc"); break; } - if (cmpl.op_status != IBV_WC_SUCCESS) + if (wc.status != IBV_WC_SUCCESS) return -1; - if (cmpl.op == RPMA_OP_SEND) + if (wc.opcode == IBV_WC_SEND) ++ccd->op_send_completed; else { log_err( - "A completion other than RPMA_OP_SEND got during cleaning up the CQ from SENDs\n"); + "A completion other than IBV_WC_SEND got during cleaning up the CQ from SENDs\n"); return -1; } } @@ -251,6 +252,7 @@ struct librpma_fio_server_data { /* resources of an incoming connection */ struct rpma_conn *conn; + struct rpma_cq *cq; char *ws_ptr; struct rpma_mr_local *ws_mr; diff --git a/engines/librpma_gpspm.c b/engines/librpma_gpspm.c index 74147709..14626e7f 100644 --- a/engines/librpma_gpspm.c +++ b/engines/librpma_gpspm.c @@ -60,8 +60,7 @@ static inline int client_io_flush(struct thread_data *td, struct io_u *first_io_u, struct io_u *last_io_u, unsigned long long int len); -static int client_get_io_u_index(struct rpma_completion *cmpl, - unsigned int *io_u_index); +static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index); static int client_init(struct thread_data *td) { @@ -317,17 +316,16 @@ static inline int client_io_flush(struct thread_data *td, return 0; } -static int client_get_io_u_index(struct rpma_completion *cmpl, - unsigned int *io_u_index) +static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index) { GPSPMFlushResponse *flush_resp; - if (cmpl->op != RPMA_OP_RECV) + if (wc->opcode != IBV_WC_RECV) return 0; /* unpack a response from the received buffer */ flush_resp = gpspm_flush_response__unpack(NULL, - cmpl->byte_len, cmpl->op_context); + wc->byte_len, (void *)wc->wr_id); if (flush_resp == NULL) { log_err("Cannot unpack the flush response buffer\n"); return -1; @@ -373,7 +371,7 @@ struct server_data { uint32_t msg_sqe_available; /* # of free SQ slots */ /* in-memory queues */ - struct rpma_completion *msgs_queued; + struct ibv_wc *msgs_queued; uint32_t msg_queued_nr; }; @@ -562,8 +560,7 @@ err_cfg_delete: return ret; } -static int server_qe_process(struct thread_data *td, - struct rpma_completion *cmpl) +static int server_qe_process(struct thread_data *td, struct ibv_wc *wc) { struct librpma_fio_server_data *csd = td->io_ops_data; struct server_data *sd = csd->server_data; @@ -580,7 +577,7 @@ static int server_qe_process(struct thread_data *td, int ret; /* calculate SEND/RECV pair parameters */ - msg_index = (int)(uintptr_t)cmpl->op_context; + msg_index = (int)(uintptr_t)wc->wr_id; io_u_buff_offset = IO_U_BUFF_OFF_SERVER(msg_index); send_buff_offset = io_u_buff_offset + SEND_OFFSET; recv_buff_offset = io_u_buff_offset + RECV_OFFSET; @@ -588,7 +585,7 @@ static int server_qe_process(struct thread_data *td, recv_buff_ptr = sd->orig_buffer_aligned + recv_buff_offset; /* unpack a flush request from the received buffer */ - flush_req = gpspm_flush_request__unpack(NULL, cmpl->byte_len, + flush_req = gpspm_flush_request__unpack(NULL, wc->byte_len, recv_buff_ptr); if (flush_req == NULL) { log_err("cannot unpack the flush request buffer\n"); @@ -682,28 +679,28 @@ static int server_cmpl_process(struct thread_data *td) { struct librpma_fio_server_data *csd = td->io_ops_data; struct server_data *sd = csd->server_data; - struct rpma_completion *cmpl = &sd->msgs_queued[sd->msg_queued_nr]; + struct ibv_wc *wc = &sd->msgs_queued[sd->msg_queued_nr]; struct librpma_fio_options_values *o = td->eo; int ret; - ret = rpma_conn_completion_get(csd->conn, cmpl); + ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL); if (ret == RPMA_E_NO_COMPLETION) { if (o->busy_wait_polling == 0) { - ret = rpma_conn_completion_wait(csd->conn); + ret = rpma_cq_wait(csd->cq); if (ret == RPMA_E_NO_COMPLETION) { /* lack of completion is not an error */ return 0; } else if (ret != 0) { - librpma_td_verror(td, ret, "rpma_conn_completion_wait"); + librpma_td_verror(td, ret, "rpma_cq_wait"); goto err_terminate; } - ret = rpma_conn_completion_get(csd->conn, cmpl); + ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL); if (ret == RPMA_E_NO_COMPLETION) { /* lack of completion is not an error */ return 0; } else if (ret != 0) { - librpma_td_verror(td, ret, "rpma_conn_completion_get"); + librpma_td_verror(td, ret, "rpma_cq_get_wc"); goto err_terminate; } } else { @@ -711,17 +708,17 @@ static int server_cmpl_process(struct thread_data *td) return 0; } } else if (ret != 0) { - librpma_td_verror(td, ret, "rpma_conn_completion_get"); + librpma_td_verror(td, ret, "rpma_cq_get_wc"); goto err_terminate; } /* validate the completion */ - if (cmpl->op_status != IBV_WC_SUCCESS) + if (wc->status != IBV_WC_SUCCESS) goto err_terminate; - if (cmpl->op == RPMA_OP_RECV) + if (wc->opcode == IBV_WC_RECV) ++sd->msg_queued_nr; - else if (cmpl->op == RPMA_OP_SEND) + else if (wc->opcode == IBV_WC_SEND) ++sd->msg_sqe_available; return 0; -- 2.25.1