rpma: update RPMA engines with new librpma completions API
[fio.git] / engines / librpma_fio.c
index 810b55e23d522d920492bbeaf282667b4a49d317..dfd8218006c0f0e8d33741b085daf6291c73517e 100644 (file)
@@ -49,6 +49,17 @@ struct fio_option librpma_fio_options[] = {
                .category = FIO_OPT_C_ENGINE,
                .group  = FIO_OPT_G_LIBRPMA,
        },
+       {
+               .name   = "busy_wait_polling",
+               .lname  = "Set to 0 to wait for completion instead of busy-wait polling completion.",
+               .type   = FIO_OPT_BOOL,
+               .off1   = offsetof(struct librpma_fio_options_values,
+                                       busy_wait_polling),
+               .help   = "Set to false if you want to reduce CPU usage",
+               .def    = "1",
+               .category = FIO_OPT_C_ENGINE,
+               .group  = FIO_OPT_G_LIBRPMA,
+       },
        {
                .name   = NULL,
        },
@@ -97,7 +108,7 @@ char *librpma_fio_allocate_dram(struct thread_data *td, size_t size,
        return mem_ptr;
 }
 
-char *librpma_fio_allocate_pmem(struct thread_data *td, const char *filename,
+char *librpma_fio_allocate_pmem(struct thread_data *td, struct fio_file *f,
                size_t size, struct librpma_fio_mem *mem)
 {
        size_t size_mmap = 0;
@@ -111,18 +122,24 @@ char *librpma_fio_allocate_pmem(struct thread_data *td, const char *filename,
                return NULL;
        }
 
-       ws_offset = (td->thread_number - 1) * size;
+       if (f->filetype == FIO_TYPE_CHAR) {
+               /* Each thread uses a separate offset within DeviceDAX. */
+               ws_offset = (td->thread_number - 1) * size;
+       } else {
+               /* Each thread uses a separate FileSystemDAX file. No offset is needed. */
+               ws_offset = 0;
+       }
 
-       if (!filename) {
+       if (!f->file_name) {
                log_err("fio: filename is not set\n");
                return NULL;
        }
 
        /* map the file */
-       mem_ptr = pmem_map_file(filename, 0 /* len */, 0 /* flags */,
+       mem_ptr = pmem_map_file(f->file_name, 0 /* len */, 0 /* flags */,
                        0 /* mode */, &size_mmap, &is_pmem);
        if (mem_ptr == NULL) {
-               log_err("fio: pmem_map_file(%s) failed\n", filename);
+               log_err("fio: pmem_map_file(%s) failed\n", f->file_name);
                /* pmem_map_file() sets errno on failure */
                td_verror(td, errno, "pmem_map_file");
                return NULL;
@@ -131,7 +148,7 @@ char *librpma_fio_allocate_pmem(struct thread_data *td, const char *filename,
        /* pmem is expected */
        if (!is_pmem) {
                log_err("fio: %s is not located in persistent memory\n",
-                       filename);
+                       f->file_name);
                goto err_unmap;
        }
 
@@ -139,12 +156,12 @@ char *librpma_fio_allocate_pmem(struct thread_data *td, const char *filename,
        if (size_mmap < ws_offset + size) {
                log_err(
                        "fio: %s is too small to handle so many threads (%zu < %zu)\n",
-                       filename, size_mmap, ws_offset + size);
+                       f->file_name, size_mmap, ws_offset + size);
                goto err_unmap;
        }
 
        log_info("fio: size of memory mapped from the file %s: %zu\n",
-               filename, size_mmap);
+               f->file_name, size_mmap);
 
        mem->mem_ptr = mem_ptr;
        mem->size_mmap = size_mmap;
@@ -285,6 +302,12 @@ int librpma_fio_client_init(struct thread_data *td,
        if (ccd->conn == NULL)
                goto err_peer_delete;
 
+       /* get the connection's main CQ */
+       if ((ret = rpma_conn_get_cq(ccd->conn, &ccd->cq))) {
+               librpma_td_verror(td, ret, "rpma_conn_get_cq");
+               goto err_conn_delete;
+       }
+
        /* get the connection's private data sent from the server */
        if ((ret = rpma_conn_get_private_data(ccd->conn, &pdata))) {
                librpma_td_verror(td, ret, "rpma_conn_get_private_data");
@@ -438,7 +461,7 @@ static enum fio_q_status client_queue_sync(struct thread_data *td,
                struct io_u *io_u)
 {
        struct librpma_fio_client_data *ccd = td->io_ops_data;
-       struct rpma_completion cmpl;
+       struct ibv_wc wc;
        unsigned io_u_index;
        int ret;
 
@@ -461,31 +484,31 @@ static enum fio_q_status client_queue_sync(struct thread_data *td,
 
        do {
                /* get a completion */
-               ret = rpma_conn_completion_get(ccd->conn, &cmpl);
+               ret = rpma_cq_get_wc(ccd->cq, 1, &wc, NULL);
                if (ret == RPMA_E_NO_COMPLETION) {
                        /* lack of completion is not an error */
                        continue;
                } else if (ret != 0) {
                        /* an error occurred */
-                       librpma_td_verror(td, ret, "rpma_conn_completion_get");
+                       librpma_td_verror(td, ret, "rpma_cq_get_wc");
                        goto err;
                }
 
                /* if io_us has completed with an error */
-               if (cmpl.op_status != IBV_WC_SUCCESS)
+               if (wc.status != IBV_WC_SUCCESS)
                        goto err;
 
-               if (cmpl.op == RPMA_OP_SEND)
+               if (wc.opcode == IBV_WC_SEND)
                        ++ccd->op_send_completed;
                else {
-                       if (cmpl.op == RPMA_OP_RECV)
+                       if (wc.opcode == IBV_WC_RECV)
                                ++ccd->op_recv_completed;
 
                        break;
                }
        } while (1);
 
-       if (ccd->get_io_u_index(&cmpl, &io_u_index) != 1)
+       if (ccd->get_io_u_index(&wc, &io_u_index) != 1)
                goto err;
 
        if (io_u->index != io_u_index) {
@@ -637,8 +660,8 @@ int librpma_fio_client_commit(struct thread_data *td)
 static int client_getevent_process(struct thread_data *td)
 {
        struct librpma_fio_client_data *ccd = td->io_ops_data;
-       struct rpma_completion cmpl;
-       /* io_u->index of completed io_u (cmpl.op_context) */
+       struct ibv_wc wc;
+       /* io_u->index of completed io_u (wc.wr_id) */
        unsigned int io_u_index;
        /* # of completed io_us */
        int cmpl_num = 0;
@@ -648,7 +671,7 @@ static int client_getevent_process(struct thread_data *td)
        int ret;
 
        /* get a completion */
-       if ((ret = rpma_conn_completion_get(ccd->conn, &cmpl))) {
+       if ((ret = rpma_cq_get_wc(ccd->cq, 1, &wc, NULL))) {
                /* lack of completion is not an error */
                if (ret == RPMA_E_NO_COMPLETION) {
                        /* lack of completion is not an error */
@@ -656,22 +679,22 @@ static int client_getevent_process(struct thread_data *td)
                }
 
                /* an error occurred */
-               librpma_td_verror(td, ret, "rpma_conn_completion_get");
+               librpma_td_verror(td, ret, "rpma_cq_get_wc");
                return -1;
        }
 
        /* if io_us has completed with an error */
-       if (cmpl.op_status != IBV_WC_SUCCESS) {
-               td->error = cmpl.op_status;
+       if (wc.status != IBV_WC_SUCCESS) {
+               td->error = wc.status;
                return -1;
        }
 
-       if (cmpl.op == RPMA_OP_SEND)
+       if (wc.opcode == IBV_WC_SEND)
                ++ccd->op_send_completed;
-       else if (cmpl.op == RPMA_OP_RECV)
+       else if (wc.opcode == IBV_WC_RECV)
                ++ccd->op_recv_completed;
 
-       if ((ret = ccd->get_io_u_index(&cmpl, &io_u_index)) != 1)
+       if ((ret = ccd->get_io_u_index(&wc, &io_u_index)) != 1)
                return ret;
 
        /* look for an io_u being completed */
@@ -733,7 +756,7 @@ int librpma_fio_client_getevents(struct thread_data *td, unsigned int min,
 
                        /*
                         * To reduce CPU consumption one can use
-                        * the rpma_conn_completion_wait() function.
+                        * the rpma_cq_wait() function.
                         * Note this greatly increase the latency
                         * and make the results less stable.
                         * The bandwidth stays more or less the same.
@@ -882,6 +905,7 @@ int librpma_fio_server_open_file(struct thread_data *td, struct fio_file *f,
        size_t mem_size = td->o.size;
        size_t mr_desc_size;
        void *ws_ptr;
+       bool is_dram;
        int usage_mem_type;
        int ret;
 
@@ -899,14 +923,14 @@ int librpma_fio_server_open_file(struct thread_data *td, struct fio_file *f,
                return -1;
        }
 
-       if (strcmp(f->file_name, "malloc") == 0) {
+       is_dram = !strcmp(f->file_name, "malloc");
+       if (is_dram) {
                /* allocation from DRAM using posix_memalign() */
                ws_ptr = librpma_fio_allocate_dram(td, mem_size, &csd->mem);
                usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_VISIBILITY;
        } else {
                /* allocation from PMEM using pmem_map_file() */
-               ws_ptr = librpma_fio_allocate_pmem(td, f->file_name,
-                               mem_size, &csd->mem);
+               ws_ptr = librpma_fio_allocate_pmem(td, f, mem_size, &csd->mem);
                usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT;
        }
 
@@ -923,6 +947,21 @@ int librpma_fio_server_open_file(struct thread_data *td, struct fio_file *f,
                goto err_free;
        }
 
+       if (!is_dram && f->filetype == FIO_TYPE_FILE) {
+               ret = rpma_mr_advise(mr, 0, mem_size,
+                               IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE,
+                               IBV_ADVISE_MR_FLAG_FLUSH);
+               if (ret) {
+                       librpma_td_verror(td, ret, "rpma_mr_advise");
+                       /* an invalid argument is an error */
+                       if (ret == RPMA_E_INVAL)
+                               goto err_mr_dereg;
+
+                       /* log_err used instead of log_info to avoid corruption of the JSON output */
+                       log_err("Note: having rpma_mr_advise(3) failed because of RPMA_E_NOSUPP or RPMA_E_PROVIDER may come with a performance penalty, but it is not a blocker for running the benchmark.\n");
+               }
+       }
+
        /* get size of the memory region's descriptor */
        if ((ret = rpma_mr_get_descriptor_size(mr, &mr_desc_size))) {
                librpma_td_verror(td, ret, "rpma_mr_get_descriptor_size");
@@ -996,6 +1035,12 @@ int librpma_fio_server_open_file(struct thread_data *td, struct fio_file *f,
        csd->ws_ptr = ws_ptr;
        csd->conn = conn;
 
+       /* get the connection's main CQ */
+       if ((ret = rpma_conn_get_cq(csd->conn, &csd->cq))) {
+               librpma_td_verror(td, ret, "rpma_conn_get_cq");
+               goto err_conn_delete;
+       }
+
        return 0;
 
 err_conn_delete: