* librpma_gpspm: IO engine that uses PMDK librpma to write data,
* based on General Purpose Server Persistency Method
*
- * Copyright 2020-2021, Intel Corporation
+ * Copyright 2020-2022, Intel Corporation
*
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License,
#include "librpma_fio.h"
+#ifdef CONFIG_LIBPMEM2_INSTALLED
+#include <libpmem2.h>
+#else
#include <libpmem.h>
+#endif
/* Generated by the protocol buffer compiler from: librpma_gpspm_flush.proto */
#include "librpma_gpspm_flush.pb-c.h"
struct io_u *first_io_u, struct io_u *last_io_u,
unsigned long long int len);
-static int client_get_io_u_index(struct rpma_completion *cmpl,
- unsigned int *io_u_index);
+static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index);
static int client_init(struct thread_data *td)
{
return 0;
}
-static int client_get_io_u_index(struct rpma_completion *cmpl,
- unsigned int *io_u_index)
+static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index)
{
GPSPMFlushResponse *flush_resp;
- if (cmpl->op != RPMA_OP_RECV)
+ if (wc->opcode != IBV_WC_RECV)
return 0;
/* unpack a response from the received buffer */
flush_resp = gpspm_flush_response__unpack(NULL,
- cmpl->byte_len, cmpl->op_context);
+ wc->byte_len, (void *)wc->wr_id);
if (flush_resp == NULL) {
log_err("Cannot unpack the flush response buffer\n");
return -1;
.errdetails = librpma_fio_client_errdetails,
.close_file = librpma_fio_file_nop,
.cleanup = client_cleanup,
- .flags = FIO_DISKLESSIO,
+ .flags = FIO_DISKLESSIO | FIO_ASYNCIO_SETS_ISSUE_TIME,
.options = librpma_fio_options,
.option_struct_size = sizeof(struct librpma_fio_options_values),
};
#define IO_U_BUFF_OFF_SERVER(i) (i * IO_U_BUF_LEN)
+typedef void (*librpma_fio_persist_fn)(const void *ptr, size_t size);
+
struct server_data {
/* aligned td->orig_buffer */
char *orig_buffer_aligned;
uint32_t msg_sqe_available; /* # of free SQ slots */
/* in-memory queues */
- struct rpma_completion *msgs_queued;
+ struct ibv_wc *msgs_queued;
uint32_t msg_queued_nr;
+
+ librpma_fio_persist_fn persist;
};
static int server_init(struct thread_data *td)
goto err_free_sd;
}
+#ifdef CONFIG_LIBPMEM2_INSTALLED
+ /* get libpmem2 persist function from pmem2_map */
+ sd->persist = pmem2_get_persist_fn(csd->mem.map);
+#else
+ sd->persist = pmem_persist;
+#endif
+
/*
* Assure a single io_u buffer can store both SEND and RECV messages and
* an io_us buffer allocation is page-size-aligned which is required
/*
* td->orig_buffer is not aligned. The engine requires aligned io_us
- * so FIO alignes up the address using the formula below.
+ * so FIO aligns up the address using the formula below.
*/
sd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
td->o.mem_align;
return ret;
}
-static int server_qe_process(struct thread_data *td,
- struct rpma_completion *cmpl)
+static int server_qe_process(struct thread_data *td, struct ibv_wc *wc)
{
struct librpma_fio_server_data *csd = td->io_ops_data;
struct server_data *sd = csd->server_data;
int ret;
/* calculate SEND/RECV pair parameters */
- msg_index = (int)(uintptr_t)cmpl->op_context;
+ msg_index = (int)(uintptr_t)wc->wr_id;
io_u_buff_offset = IO_U_BUFF_OFF_SERVER(msg_index);
send_buff_offset = io_u_buff_offset + SEND_OFFSET;
recv_buff_offset = io_u_buff_offset + RECV_OFFSET;
recv_buff_ptr = sd->orig_buffer_aligned + recv_buff_offset;
/* unpack a flush request from the received buffer */
- flush_req = gpspm_flush_request__unpack(NULL, cmpl->byte_len,
+ flush_req = gpspm_flush_request__unpack(NULL, wc->byte_len,
recv_buff_ptr);
if (flush_req == NULL) {
log_err("cannot unpack the flush request buffer\n");
if (IS_NOT_THE_LAST_MESSAGE(flush_req)) {
op_ptr = csd->ws_ptr + flush_req->offset;
- pmem_persist(op_ptr, flush_req->length);
+ sd->persist(op_ptr, flush_req->length);
} else {
/*
* This is the last message - the client is done.
{
struct librpma_fio_server_data *csd = td->io_ops_data;
struct server_data *sd = csd->server_data;
- struct rpma_completion *cmpl = &sd->msgs_queued[sd->msg_queued_nr];
+ struct ibv_wc *wc = &sd->msgs_queued[sd->msg_queued_nr];
+ struct librpma_fio_options_values *o = td->eo;
int ret;
- ret = rpma_conn_completion_get(csd->conn, cmpl);
+ ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL);
if (ret == RPMA_E_NO_COMPLETION) {
- /* lack of completion is not an error */
- return 0;
- } else if (ret != 0) {
- librpma_td_verror(td, ret, "rpma_conn_completion_get");
+ if (o->busy_wait_polling)
+ return 0; /* lack of completion is not an error */
+
+ ret = rpma_cq_wait(csd->cq);
+ if (ret == RPMA_E_NO_COMPLETION)
+ return 0; /* lack of completion is not an error */
+ if (ret) {
+ librpma_td_verror(td, ret, "rpma_cq_wait");
+ goto err_terminate;
+ }
+
+ ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL);
+ if (ret == RPMA_E_NO_COMPLETION)
+ return 0; /* lack of completion is not an error */
+ if (ret) {
+ librpma_td_verror(td, ret, "rpma_cq_get_wc");
+ goto err_terminate;
+ }
+ } else if (ret) {
+ librpma_td_verror(td, ret, "rpma_cq_get_wc");
goto err_terminate;
}
/* validate the completion */
- if (cmpl->op_status != IBV_WC_SUCCESS)
+ if (wc->status != IBV_WC_SUCCESS)
goto err_terminate;
- if (cmpl->op == RPMA_OP_RECV)
+ if (wc->opcode == IBV_WC_RECV)
++sd->msg_queued_nr;
- else if (cmpl->op == RPMA_OP_SEND)
+ else if (wc->opcode == IBV_WC_SEND)
++sd->msg_sqe_available;
return 0;