2 * librpma_fio: librpma_apm and librpma_gpspm engines' common part.
4 * Copyright 2021-2022, Intel Corporation
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License,
8 * version 2 as published by the Free Software Foundation..
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #ifdef CONFIG_LIBPMEM2_INSTALLED
17 #include "librpma_fio_pmem2.h"
19 #include "librpma_fio_pmem.h"
20 #endif /* CONFIG_LIBPMEM2_INSTALLED */
22 struct fio_option librpma_fio_options[] = {
25 .lname = "rpma_server_ip",
26 .type = FIO_OPT_STR_STORE,
27 .off1 = offsetof(struct librpma_fio_options_values, server_ip),
28 .help = "IP address the server is listening on",
30 .category = FIO_OPT_C_ENGINE,
31 .group = FIO_OPT_G_LIBRPMA,
35 .lname = "rpma_server port",
36 .type = FIO_OPT_STR_STORE,
37 .off1 = offsetof(struct librpma_fio_options_values, port),
38 .help = "port the server is listening on",
40 .category = FIO_OPT_C_ENGINE,
41 .group = FIO_OPT_G_LIBRPMA,
44 .name = "direct_write_to_pmem",
45 .lname = "Direct Write to PMem (via RDMA) from the remote host is possible",
47 .off1 = offsetof(struct librpma_fio_options_values,
48 direct_write_to_pmem),
49 .help = "Set to true ONLY when Direct Write to PMem from the remote host is possible (https://pmem.io/rpma/documentation/basic-direct-write-to-pmem.html)",
51 .category = FIO_OPT_C_ENGINE,
52 .group = FIO_OPT_G_LIBRPMA,
55 .name = "busy_wait_polling",
56 .lname = "Set to 0 to wait for completion instead of busy-wait polling completion.",
58 .off1 = offsetof(struct librpma_fio_options_values,
60 .help = "Set to false if you want to reduce CPU usage",
62 .category = FIO_OPT_C_ENGINE,
63 .group = FIO_OPT_G_LIBRPMA,
70 int librpma_fio_td_port(const char *port_base_str, struct thread_data *td,
73 unsigned long int port_ul = strtoul(port_base_str, NULL, 10);
74 unsigned int port_new;
78 if (port_ul == ULONG_MAX) {
79 td_verror(td, errno, "strtoul");
82 port_ul += td->thread_number - 1;
83 if (port_ul >= UINT_MAX) {
84 log_err("[%u] port number (%lu) bigger than UINT_MAX\n",
85 td->thread_number, port_ul);
90 snprintf(port_out, LIBRPMA_FIO_PORT_STR_LEN_MAX - 1, "%u", port_new);
95 char *librpma_fio_allocate_dram(struct thread_data *td, size_t size,
96 struct librpma_fio_mem *mem)
101 if ((ret = posix_memalign((void **)&mem_ptr, page_size, size))) {
102 log_err("fio: posix_memalign() failed\n");
103 td_verror(td, ret, "posix_memalign");
107 mem->mem_ptr = mem_ptr;
113 char *librpma_fio_allocate_pmem(struct thread_data *td, struct fio_file *f,
114 size_t size, struct librpma_fio_mem *mem)
119 if (size % page_size) {
120 log_err("fio: size (%zu) is not aligned to page size (%zu)\n",
125 if (f->filetype == FIO_TYPE_CHAR) {
126 /* Each thread uses a separate offset within DeviceDAX. */
127 ws_offset = (td->thread_number - 1) * size;
129 /* Each thread uses a separate FileSystemDAX file. No offset is needed. */
134 log_err("fio: filename is not set\n");
138 if (librpma_fio_pmem_map_file(f, size, mem, ws_offset)) {
139 log_err("fio: librpma_fio_pmem_map_file(%s) failed\n",
144 log_info("fio: size of memory mapped from the file %s: %zu\n",
145 f->file_name, mem->size_mmap);
147 log_info("fio: library used to map PMem from file: %s\n", RPMA_PMEM_USED);
149 return mem->mem_ptr ? mem->mem_ptr + ws_offset : NULL;
152 void librpma_fio_free(struct librpma_fio_mem *mem)
155 librpma_fio_unmap(mem);
160 #define LIBRPMA_FIO_RETRY_MAX_NO 10
161 #define LIBRPMA_FIO_RETRY_DELAY_S 5
163 int librpma_fio_client_init(struct thread_data *td,
164 struct rpma_conn_cfg *cfg)
166 struct librpma_fio_client_data *ccd;
167 struct librpma_fio_options_values *o = td->eo;
168 struct ibv_context *dev = NULL;
169 char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
170 struct rpma_conn_req *req = NULL;
171 enum rpma_conn_event event;
172 struct rpma_conn_private_data pdata;
173 enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
174 int remote_flush_type;
178 /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
180 if ((1UL << FD_NET) & fio_debug)
181 log_level_aux = RPMA_LOG_LEVEL_INFO;
184 /* configure logging thresholds to see more details */
185 rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
186 rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
188 /* obtain an IBV context for a remote IP address */
189 if ((ret = rpma_utils_get_ibv_context(o->server_ip,
190 RPMA_UTIL_IBV_CONTEXT_REMOTE, &dev))) {
191 librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
195 /* allocate client's data */
196 ccd = calloc(1, sizeof(*ccd));
198 td_verror(td, errno, "calloc");
202 /* allocate all in-memory queues */
203 ccd->io_us_queued = calloc(td->o.iodepth, sizeof(*ccd->io_us_queued));
204 if (ccd->io_us_queued == NULL) {
205 td_verror(td, errno, "calloc");
209 ccd->io_us_flight = calloc(td->o.iodepth, sizeof(*ccd->io_us_flight));
210 if (ccd->io_us_flight == NULL) {
211 td_verror(td, errno, "calloc");
212 goto err_free_io_u_queues;
215 ccd->io_us_completed = calloc(td->o.iodepth,
216 sizeof(*ccd->io_us_completed));
217 if (ccd->io_us_completed == NULL) {
218 td_verror(td, errno, "calloc");
219 goto err_free_io_u_queues;
222 /* create a new peer object */
223 if ((ret = rpma_peer_new(dev, &ccd->peer))) {
224 librpma_td_verror(td, ret, "rpma_peer_new");
225 goto err_free_io_u_queues;
228 /* create a connection request */
229 if (librpma_fio_td_port(o->port, td, port_td))
230 goto err_peer_delete;
232 for (retry = 0; retry < LIBRPMA_FIO_RETRY_MAX_NO; retry++) {
233 if ((ret = rpma_conn_req_new(ccd->peer, o->server_ip, port_td,
235 librpma_td_verror(td, ret, "rpma_conn_req_new");
236 goto err_peer_delete;
240 * Connect the connection request
241 * and obtain the connection object.
243 if ((ret = rpma_conn_req_connect(&req, NULL, &ccd->conn))) {
244 librpma_td_verror(td, ret, "rpma_conn_req_connect");
248 /* wait for the connection to establish */
249 if ((ret = rpma_conn_next_event(ccd->conn, &event))) {
250 librpma_td_verror(td, ret, "rpma_conn_next_event");
251 goto err_conn_delete;
252 } else if (event == RPMA_CONN_ESTABLISHED) {
254 } else if (event == RPMA_CONN_REJECTED) {
255 (void) rpma_conn_disconnect(ccd->conn);
256 (void) rpma_conn_delete(&ccd->conn);
257 if (retry < LIBRPMA_FIO_RETRY_MAX_NO - 1) {
258 log_err("Thread [%d]: Retrying (#%i) ...\n",
259 td->thread_number, retry + 1);
260 sleep(LIBRPMA_FIO_RETRY_DELAY_S);
263 "Thread [%d]: The maximum number of retries exceeded. Closing.\n",
268 "rpma_conn_next_event returned an unexptected event: (%s != RPMA_CONN_ESTABLISHED)\n",
269 rpma_utils_conn_event_2str(event));
270 goto err_conn_delete;
275 log_err("Thread [%d]: Connected after retry #%i\n",
276 td->thread_number, retry);
278 if (ccd->conn == NULL)
279 goto err_peer_delete;
281 /* get the connection's main CQ */
282 if ((ret = rpma_conn_get_cq(ccd->conn, &ccd->cq))) {
283 librpma_td_verror(td, ret, "rpma_conn_get_cq");
284 goto err_conn_delete;
287 /* get the connection's private data sent from the server */
288 if ((ret = rpma_conn_get_private_data(ccd->conn, &pdata))) {
289 librpma_td_verror(td, ret, "rpma_conn_get_private_data");
290 goto err_conn_delete;
293 /* get the server's workspace representation */
296 /* create the server's memory representation */
297 if ((ret = rpma_mr_remote_from_descriptor(&ccd->ws->descriptor[0],
298 ccd->ws->mr_desc_size, &ccd->server_mr))) {
299 librpma_td_verror(td, ret, "rpma_mr_remote_from_descriptor");
300 goto err_conn_delete;
303 /* get the total size of the shared server memory */
304 if ((ret = rpma_mr_remote_get_size(ccd->server_mr, &ccd->ws_size))) {
305 librpma_td_verror(td, ret, "rpma_mr_remote_get_size");
306 goto err_conn_delete;
309 /* get flush type of the remote node */
310 if ((ret = rpma_mr_remote_get_flush_type(ccd->server_mr,
311 &remote_flush_type))) {
312 librpma_td_verror(td, ret, "rpma_mr_remote_get_flush_type");
313 goto err_conn_delete;
316 ccd->server_mr_flush_type =
317 (remote_flush_type & RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT) ?
318 RPMA_FLUSH_TYPE_PERSISTENT : RPMA_FLUSH_TYPE_VISIBILITY;
321 * Assure an io_us buffer allocation is page-size-aligned which is required
322 * to register for RDMA. User-provided value is intentionally ignored.
324 td->o.mem_align = page_size;
326 td->io_ops_data = ccd;
331 (void) rpma_conn_disconnect(ccd->conn);
332 (void) rpma_conn_delete(&ccd->conn);
335 (void) rpma_conn_req_delete(&req);
338 (void) rpma_peer_delete(&ccd->peer);
340 err_free_io_u_queues:
341 free(ccd->io_us_queued);
342 free(ccd->io_us_flight);
343 free(ccd->io_us_completed);
351 void librpma_fio_client_cleanup(struct thread_data *td)
353 struct librpma_fio_client_data *ccd = td->io_ops_data;
354 enum rpma_conn_event ev;
360 /* delete the iou's memory registration */
361 if ((ret = rpma_mr_dereg(&ccd->orig_mr)))
362 librpma_td_verror(td, ret, "rpma_mr_dereg");
363 /* delete the iou's memory registration */
364 if ((ret = rpma_mr_remote_delete(&ccd->server_mr)))
365 librpma_td_verror(td, ret, "rpma_mr_remote_delete");
366 /* initiate disconnection */
367 if ((ret = rpma_conn_disconnect(ccd->conn)))
368 librpma_td_verror(td, ret, "rpma_conn_disconnect");
369 /* wait for disconnection to end up */
370 if ((ret = rpma_conn_next_event(ccd->conn, &ev))) {
371 librpma_td_verror(td, ret, "rpma_conn_next_event");
372 } else if (ev != RPMA_CONN_CLOSED) {
374 "client_cleanup received an unexpected event (%s != RPMA_CONN_CLOSED)\n",
375 rpma_utils_conn_event_2str(ev));
377 /* delete the connection */
378 if ((ret = rpma_conn_delete(&ccd->conn)))
379 librpma_td_verror(td, ret, "rpma_conn_delete");
380 /* delete the peer */
381 if ((ret = rpma_peer_delete(&ccd->peer)))
382 librpma_td_verror(td, ret, "rpma_peer_delete");
383 /* free the software queues */
384 free(ccd->io_us_queued);
385 free(ccd->io_us_flight);
386 free(ccd->io_us_completed);
388 td->io_ops_data = NULL; /* zero ccd */
391 int librpma_fio_file_nop(struct thread_data *td, struct fio_file *f)
397 int librpma_fio_client_post_init(struct thread_data *td)
399 struct librpma_fio_client_data *ccd = td->io_ops_data;
404 * td->orig_buffer is not aligned. The engine requires aligned io_us
405 * so FIO aligns up the address using the formula below.
407 ccd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
411 * td->orig_buffer_size beside the space really consumed by io_us
412 * has paddings which can be omitted for the memory registration.
414 io_us_size = (unsigned long long)td_max_bs(td) *
415 (unsigned long long)td->o.iodepth;
417 if ((ret = rpma_mr_reg(ccd->peer, ccd->orig_buffer_aligned, io_us_size,
418 RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
419 RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
420 RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT, &ccd->orig_mr)))
421 librpma_td_verror(td, ret, "rpma_mr_reg");
425 int librpma_fio_client_get_file_size(struct thread_data *td,
428 struct librpma_fio_client_data *ccd = td->io_ops_data;
430 f->real_file_size = ccd->ws_size;
431 fio_file_set_size_known(f);
436 static enum fio_q_status client_queue_sync(struct thread_data *td,
439 struct librpma_fio_client_data *ccd = td->io_ops_data;
445 if (io_u->ddir == DDIR_READ) {
446 /* post an RDMA read operation */
447 if (librpma_fio_client_io_read(td, io_u,
448 RPMA_F_COMPLETION_ALWAYS))
450 } else if (io_u->ddir == DDIR_WRITE) {
451 /* post an RDMA write operation */
452 if (librpma_fio_client_io_write(td, io_u))
454 if (ccd->flush(td, io_u, io_u, io_u->xfer_buflen))
457 log_err("unsupported IO mode: %s\n", io_ddir_name(io_u->ddir));
462 /* get a completion */
463 ret = rpma_cq_get_wc(ccd->cq, 1, &wc, NULL);
464 if (ret == RPMA_E_NO_COMPLETION) {
465 /* lack of completion is not an error */
467 } else if (ret != 0) {
468 /* an error occurred */
469 librpma_td_verror(td, ret, "rpma_cq_get_wc");
473 /* if io_us has completed with an error */
474 if (wc.status != IBV_WC_SUCCESS)
477 if (wc.opcode == IBV_WC_SEND)
478 ++ccd->op_send_completed;
480 if (wc.opcode == IBV_WC_RECV)
481 ++ccd->op_recv_completed;
487 if (ccd->get_io_u_index(&wc, &io_u_index) != 1)
490 if (io_u->index != io_u_index) {
492 "no matching io_u for received completion found (io_u_index=%u)\n",
497 /* make sure all SENDs are completed before exit - clean up SQ */
498 if (librpma_fio_client_io_complete_all_sends(td))
501 return FIO_Q_COMPLETED;
505 return FIO_Q_COMPLETED;
508 enum fio_q_status librpma_fio_client_queue(struct thread_data *td,
511 struct librpma_fio_client_data *ccd = td->io_ops_data;
513 if (ccd->io_u_queued_nr == (int)td->o.iodepth)
517 return client_queue_sync(td, io_u);
519 /* io_u -> queued[] */
520 ccd->io_us_queued[ccd->io_u_queued_nr] = io_u;
521 ccd->io_u_queued_nr++;
526 int librpma_fio_client_commit(struct thread_data *td)
528 struct librpma_fio_client_data *ccd = td->io_ops_data;
529 int flags = RPMA_F_COMPLETION_ON_ERROR;
533 struct io_u *flush_first_io_u = NULL;
534 unsigned long long int flush_len = 0;
536 if (!ccd->io_us_queued)
539 /* execute all io_us from queued[] */
540 for (i = 0; i < ccd->io_u_queued_nr; i++) {
541 struct io_u *io_u = ccd->io_us_queued[i];
543 if (io_u->ddir == DDIR_READ) {
544 if (i + 1 == ccd->io_u_queued_nr ||
545 ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE)
546 flags = RPMA_F_COMPLETION_ALWAYS;
547 /* post an RDMA read operation */
548 if (librpma_fio_client_io_read(td, io_u, flags))
550 } else if (io_u->ddir == DDIR_WRITE) {
551 /* post an RDMA write operation */
552 if (librpma_fio_client_io_write(td, io_u))
555 /* cache the first io_u in the sequence */
556 if (flush_first_io_u == NULL)
557 flush_first_io_u = io_u;
560 * the flush length is the sum of all io_u's creating
563 flush_len += io_u->xfer_buflen;
566 * if io_u's are random the rpma_flush is required
567 * after each one of them
569 if (!td_random(td)) {
571 * When the io_u's are sequential and
572 * the current io_u is not the last one and
573 * the next one is also a write operation
574 * the flush can be postponed by one io_u and
575 * cover all of them which build a continuous
578 if ((i + 1 < ccd->io_u_queued_nr) &&
579 (ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE))
583 /* flush all writes which build a continuous sequence */
584 if (ccd->flush(td, flush_first_io_u, io_u, flush_len))
588 * reset the flush parameters in preparation for
591 flush_first_io_u = NULL;
594 log_err("unsupported IO mode: %s\n",
595 io_ddir_name(io_u->ddir));
600 if ((fill_time = fio_fill_issue_time(td))) {
601 fio_gettime(&now, NULL);
604 * only used for iolog
606 if (td->o.read_iolog_file)
607 memcpy(&td->last_issue, &now, sizeof(now));
610 /* move executed io_us from queued[] to flight[] */
611 for (i = 0; i < ccd->io_u_queued_nr; i++) {
612 struct io_u *io_u = ccd->io_us_queued[i];
614 /* FIO does not do this if the engine is asynchronous */
616 memcpy(&io_u->issue_time, &now, sizeof(now));
618 /* move executed io_us from queued[] to flight[] */
619 ccd->io_us_flight[ccd->io_u_flight_nr] = io_u;
620 ccd->io_u_flight_nr++;
624 * If an engine has the commit hook
625 * it has to call io_u_queued() itself.
627 io_u_queued(td, io_u);
630 /* FIO does not do this if an engine has the commit hook. */
631 io_u_mark_submit(td, ccd->io_u_queued_nr);
632 ccd->io_u_queued_nr = 0;
639 * - > 0 - a number of completed io_us
640 * - 0 - when no complicitions received
641 * - (-1) - when an error occurred
643 static int client_getevent_process(struct thread_data *td)
645 struct librpma_fio_client_data *ccd = td->io_ops_data;
647 /* io_u->index of completed io_u (wc.wr_id) */
648 unsigned int io_u_index;
649 /* # of completed io_us */
656 /* get a completion */
657 if ((ret = rpma_cq_get_wc(ccd->cq, 1, &wc, NULL))) {
658 /* lack of completion is not an error */
659 if (ret == RPMA_E_NO_COMPLETION) {
660 /* lack of completion is not an error */
664 /* an error occurred */
665 librpma_td_verror(td, ret, "rpma_cq_get_wc");
669 /* if io_us has completed with an error */
670 if (wc.status != IBV_WC_SUCCESS) {
671 td->error = wc.status;
675 if (wc.opcode == IBV_WC_SEND)
676 ++ccd->op_send_completed;
677 else if (wc.opcode == IBV_WC_RECV)
678 ++ccd->op_recv_completed;
680 if ((ret = ccd->get_io_u_index(&wc, &io_u_index)) != 1)
683 /* look for an io_u being completed */
684 for (i = 0; i < ccd->io_u_flight_nr; ++i) {
685 if (ccd->io_us_flight[i]->index == io_u_index) {
691 /* if no matching io_u has been found */
694 "no matching io_u for received completion found (io_u_index=%u)\n",
699 /* move completed io_us to the completed in-memory queue */
700 for (i = 0; i < cmpl_num; ++i) {
701 /* get and prepare io_u */
702 io_u = ccd->io_us_flight[i];
704 /* append to the queue */
705 ccd->io_us_completed[ccd->io_u_completed_nr] = io_u;
706 ccd->io_u_completed_nr++;
709 /* remove completed io_us from the flight queue */
710 for (i = cmpl_num; i < ccd->io_u_flight_nr; ++i)
711 ccd->io_us_flight[i - cmpl_num] = ccd->io_us_flight[i];
712 ccd->io_u_flight_nr -= cmpl_num;
717 int librpma_fio_client_getevents(struct thread_data *td, unsigned int min,
718 unsigned int max, const struct timespec *t)
720 struct librpma_fio_client_data *ccd = td->io_ops_data;
721 /* total # of completed io_us */
722 int cmpl_num_total = 0;
723 /* # of completed io_us from a single event */
727 cmpl_num = client_getevent_process(td);
729 /* new completions collected */
730 cmpl_num_total += cmpl_num;
731 } else if (cmpl_num == 0) {
733 * It is required to make sure that CQEs for SENDs
734 * will flow at least at the same pace as CQEs for RECVs.
736 if (cmpl_num_total >= min &&
737 ccd->op_send_completed >= ccd->op_recv_completed)
741 * To reduce CPU consumption one can use
742 * the rpma_cq_wait() function.
743 * Note this greatly increase the latency
744 * and make the results less stable.
745 * The bandwidth stays more or less the same.
748 /* an error occurred */
753 * The expected max can be exceeded if CQEs for RECVs will come up
754 * faster than CQEs for SENDs. But it is required to make sure CQEs for
755 * SENDs will flow at least at the same pace as CQEs for RECVs.
757 } while (cmpl_num_total < max ||
758 ccd->op_send_completed < ccd->op_recv_completed);
761 * All posted SENDs are completed and RECVs for them (responses) are
762 * completed. This is the initial situation so the counters are reset.
764 if (ccd->op_send_posted == ccd->op_send_completed &&
765 ccd->op_send_completed == ccd->op_recv_completed) {
766 ccd->op_send_posted = 0;
767 ccd->op_send_completed = 0;
768 ccd->op_recv_completed = 0;
771 return cmpl_num_total;
774 struct io_u *librpma_fio_client_event(struct thread_data *td, int event)
776 struct librpma_fio_client_data *ccd = td->io_ops_data;
780 /* get the first io_u from the queue */
781 io_u = ccd->io_us_completed[0];
783 /* remove the first io_u from the queue */
784 for (i = 1; i < ccd->io_u_completed_nr; ++i)
785 ccd->io_us_completed[i - 1] = ccd->io_us_completed[i];
786 ccd->io_u_completed_nr--;
788 dprint_io_u(io_u, "client_event");
793 char *librpma_fio_client_errdetails(struct io_u *io_u)
795 /* get the string representation of an error */
796 enum ibv_wc_status status = io_u->error;
797 const char *status_str = ibv_wc_status_str(status);
799 char *details = strdup(status_str);
800 if (details == NULL) {
801 fprintf(stderr, "Error: %s\n", status_str);
802 fprintf(stderr, "Fatal error: out of memory. Aborting.\n");
806 /* FIO frees the returned string when it becomes obsolete */
810 int librpma_fio_server_init(struct thread_data *td)
812 struct librpma_fio_options_values *o = td->eo;
813 struct librpma_fio_server_data *csd;
814 struct ibv_context *dev = NULL;
815 enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
818 /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
820 if ((1UL << FD_NET) & fio_debug)
821 log_level_aux = RPMA_LOG_LEVEL_INFO;
824 /* configure logging thresholds to see more details */
825 rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
826 rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
829 /* obtain an IBV context for a remote IP address */
830 if ((ret = rpma_utils_get_ibv_context(o->server_ip,
831 RPMA_UTIL_IBV_CONTEXT_LOCAL, &dev))) {
832 librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
836 /* allocate server's data */
837 csd = calloc(1, sizeof(*csd));
839 td_verror(td, errno, "calloc");
843 /* create a new peer object */
844 if ((ret = rpma_peer_new(dev, &csd->peer))) {
845 librpma_td_verror(td, ret, "rpma_peer_new");
849 td->io_ops_data = csd;
859 void librpma_fio_server_cleanup(struct thread_data *td)
861 struct librpma_fio_server_data *csd = td->io_ops_data;
868 if ((ret = rpma_peer_delete(&csd->peer)))
869 librpma_td_verror(td, ret, "rpma_peer_delete");
874 int librpma_fio_server_open_file(struct thread_data *td, struct fio_file *f,
875 struct rpma_conn_cfg *cfg)
877 struct librpma_fio_server_data *csd = td->io_ops_data;
878 struct librpma_fio_options_values *o = td->eo;
879 enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
880 struct librpma_fio_workspace ws = {0};
881 struct rpma_conn_private_data pdata;
882 uint32_t max_msg_num;
883 struct rpma_conn_req *conn_req;
884 struct rpma_conn *conn;
885 struct rpma_mr_local *mr;
886 char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
888 size_t mem_size = td->o.size;
896 log_err("fio: filename is not set\n");
900 /* start a listening endpoint at addr:port */
901 if (librpma_fio_td_port(o->port, td, port_td))
904 if ((ret = rpma_ep_listen(csd->peer, o->server_ip, port_td, &ep))) {
905 librpma_td_verror(td, ret, "rpma_ep_listen");
909 is_dram = !strcmp(f->file_name, "malloc");
911 /* allocation from DRAM using posix_memalign() */
912 ws_ptr = librpma_fio_allocate_dram(td, mem_size, &csd->mem);
913 usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_VISIBILITY;
915 /* allocation from PMEM using pmem_map_file() */
916 ws_ptr = librpma_fio_allocate_pmem(td, f, mem_size, &csd->mem);
917 usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT;
921 goto err_ep_shutdown;
923 f->real_file_size = mem_size;
925 if ((ret = rpma_mr_reg(csd->peer, ws_ptr, mem_size,
926 RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
927 RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
928 usage_mem_type, &mr))) {
929 librpma_td_verror(td, ret, "rpma_mr_reg");
933 if (!is_dram && f->filetype == FIO_TYPE_FILE) {
934 ret = rpma_mr_advise(mr, 0, mem_size,
935 IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE,
936 IBV_ADVISE_MR_FLAG_FLUSH);
938 librpma_td_verror(td, ret, "rpma_mr_advise");
939 /* an invalid argument is an error */
940 if (ret == RPMA_E_INVAL)
943 /* log_err used instead of log_info to avoid corruption of the JSON output */
944 log_err("Note: having rpma_mr_advise(3) failed because of RPMA_E_NOSUPP or RPMA_E_PROVIDER may come with a performance penalty, but it is not a blocker for running the benchmark.\n");
948 /* get size of the memory region's descriptor */
949 if ((ret = rpma_mr_get_descriptor_size(mr, &mr_desc_size))) {
950 librpma_td_verror(td, ret, "rpma_mr_get_descriptor_size");
954 /* verify size of the memory region's descriptor */
955 if (mr_desc_size > LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE) {
957 "size of the memory region's descriptor is too big (max=%i)\n",
958 LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE);
962 /* get the memory region's descriptor */
963 if ((ret = rpma_mr_get_descriptor(mr, &ws.descriptor[0]))) {
964 librpma_td_verror(td, ret, "rpma_mr_get_descriptor");
969 if ((ret = rpma_conn_cfg_get_rq_size(cfg, &max_msg_num))) {
970 librpma_td_verror(td, ret, "rpma_conn_cfg_get_rq_size");
974 /* verify whether iodepth fits into uint16_t */
975 if (max_msg_num > UINT16_MAX) {
976 log_err("fio: iodepth too big (%u > %u)\n",
977 max_msg_num, UINT16_MAX);
981 ws.max_msg_num = max_msg_num;
984 /* prepare a workspace description */
985 ws.direct_write_to_pmem = o->direct_write_to_pmem;
986 ws.mr_desc_size = mr_desc_size;
988 pdata.len = sizeof(ws);
990 /* receive an incoming connection request */
991 if ((ret = rpma_ep_next_conn_req(ep, cfg, &conn_req))) {
992 librpma_td_verror(td, ret, "rpma_ep_next_conn_req");
996 if (csd->prepare_connection && csd->prepare_connection(td, conn_req))
999 /* accept the connection request and obtain the connection object */
1000 if ((ret = rpma_conn_req_connect(&conn_req, &pdata, &conn))) {
1001 librpma_td_verror(td, ret, "rpma_conn_req_connect");
1002 goto err_req_delete;
1005 /* wait for the connection to be established */
1006 if ((ret = rpma_conn_next_event(conn, &conn_event))) {
1007 librpma_td_verror(td, ret, "rpma_conn_next_event");
1008 goto err_conn_delete;
1009 } else if (conn_event != RPMA_CONN_ESTABLISHED) {
1010 log_err("rpma_conn_next_event returned an unexptected event\n");
1011 goto err_conn_delete;
1014 /* end-point is no longer needed */
1015 (void) rpma_ep_shutdown(&ep);
1018 csd->ws_ptr = ws_ptr;
1021 /* get the connection's main CQ */
1022 if ((ret = rpma_conn_get_cq(csd->conn, &csd->cq))) {
1023 librpma_td_verror(td, ret, "rpma_conn_get_cq");
1024 goto err_conn_delete;
1030 (void) rpma_conn_delete(&conn);
1033 (void) rpma_conn_req_delete(&conn_req);
1036 (void) rpma_mr_dereg(&mr);
1039 librpma_fio_free(&csd->mem);
1042 (void) rpma_ep_shutdown(&ep);
1047 int librpma_fio_server_close_file(struct thread_data *td, struct fio_file *f)
1049 struct librpma_fio_server_data *csd = td->io_ops_data;
1050 enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
1054 /* wait for the connection to be closed */
1055 ret = rpma_conn_next_event(csd->conn, &conn_event);
1056 if (!ret && conn_event != RPMA_CONN_CLOSED) {
1057 log_err("rpma_conn_next_event returned an unexptected event\n");
1061 if ((ret = rpma_conn_disconnect(csd->conn))) {
1062 librpma_td_verror(td, ret, "rpma_conn_disconnect");
1066 if ((ret = rpma_conn_delete(&csd->conn))) {
1067 librpma_td_verror(td, ret, "rpma_conn_delete");
1071 if ((ret = rpma_mr_dereg(&csd->ws_mr))) {
1072 librpma_td_verror(td, ret, "rpma_mr_dereg");
1076 librpma_fio_free(&csd->mem);