2 * librpma_fio: librpma_apm and librpma_gpspm engines' common part.
4 * Copyright 2021, Intel Corporation
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License,
8 * version 2 as published by the Free Software Foundation..
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
16 #include "librpma_fio.h"
20 struct fio_option librpma_fio_options[] = {
23 .lname = "rpma_server_ip",
24 .type = FIO_OPT_STR_STORE,
25 .off1 = offsetof(struct librpma_fio_options_values, server_ip),
26 .help = "IP address the server is listening on",
28 .category = FIO_OPT_C_ENGINE,
29 .group = FIO_OPT_G_LIBRPMA,
33 .lname = "rpma_server port",
34 .type = FIO_OPT_STR_STORE,
35 .off1 = offsetof(struct librpma_fio_options_values, port),
36 .help = "port the server is listening on",
38 .category = FIO_OPT_C_ENGINE,
39 .group = FIO_OPT_G_LIBRPMA,
42 .name = "direct_write_to_pmem",
43 .lname = "Direct Write to PMem (via RDMA) from the remote host is possible",
45 .off1 = offsetof(struct librpma_fio_options_values,
46 direct_write_to_pmem),
47 .help = "Set to true ONLY when Direct Write to PMem from the remote host is possible (https://pmem.io/rpma/documentation/basic-direct-write-to-pmem.html)",
49 .category = FIO_OPT_C_ENGINE,
50 .group = FIO_OPT_G_LIBRPMA,
53 .name = "busy_wait_polling",
54 .lname = "Set to 0 to wait for completion instead of busy-wait polling completion.",
56 .off1 = offsetof(struct librpma_fio_options_values,
58 .help = "Set to false if you want to reduce CPU usage",
60 .category = FIO_OPT_C_ENGINE,
61 .group = FIO_OPT_G_LIBRPMA,
68 int librpma_fio_td_port(const char *port_base_str, struct thread_data *td,
71 unsigned long int port_ul = strtoul(port_base_str, NULL, 10);
72 unsigned int port_new;
76 if (port_ul == ULONG_MAX) {
77 td_verror(td, errno, "strtoul");
80 port_ul += td->thread_number - 1;
81 if (port_ul >= UINT_MAX) {
82 log_err("[%u] port number (%lu) bigger than UINT_MAX\n",
83 td->thread_number, port_ul);
88 snprintf(port_out, LIBRPMA_FIO_PORT_STR_LEN_MAX - 1, "%u", port_new);
93 char *librpma_fio_allocate_dram(struct thread_data *td, size_t size,
94 struct librpma_fio_mem *mem)
99 if ((ret = posix_memalign((void **)&mem_ptr, page_size, size))) {
100 log_err("fio: posix_memalign() failed\n");
101 td_verror(td, ret, "posix_memalign");
105 mem->mem_ptr = mem_ptr;
111 char *librpma_fio_allocate_pmem(struct thread_data *td, struct fio_file *f,
112 size_t size, struct librpma_fio_mem *mem)
114 size_t size_mmap = 0;
115 char *mem_ptr = NULL;
119 if (size % page_size) {
120 log_err("fio: size (%zu) is not aligned to page size (%zu)\n",
125 if (f->filetype == FIO_TYPE_CHAR) {
126 /* Each thread uses a separate offset within DeviceDAX. */
127 ws_offset = (td->thread_number - 1) * size;
129 /* Each thread uses a separate FileSystemDAX file. No offset is needed. */
134 log_err("fio: filename is not set\n");
139 mem_ptr = pmem_map_file(f->file_name, 0 /* len */, 0 /* flags */,
140 0 /* mode */, &size_mmap, &is_pmem);
141 if (mem_ptr == NULL) {
142 log_err("fio: pmem_map_file(%s) failed\n", f->file_name);
143 /* pmem_map_file() sets errno on failure */
144 td_verror(td, errno, "pmem_map_file");
148 /* pmem is expected */
150 log_err("fio: %s is not located in persistent memory\n",
155 /* check size of allocated persistent memory */
156 if (size_mmap < ws_offset + size) {
158 "fio: %s is too small to handle so many threads (%zu < %zu)\n",
159 f->file_name, size_mmap, ws_offset + size);
163 log_info("fio: size of memory mapped from the file %s: %zu\n",
164 f->file_name, size_mmap);
166 mem->mem_ptr = mem_ptr;
167 mem->size_mmap = size_mmap;
169 return mem_ptr + ws_offset;
172 (void) pmem_unmap(mem_ptr, size_mmap);
176 void librpma_fio_free(struct librpma_fio_mem *mem)
179 (void) pmem_unmap(mem->mem_ptr, mem->size_mmap);
184 #define LIBRPMA_FIO_RETRY_MAX_NO 10
185 #define LIBRPMA_FIO_RETRY_DELAY_S 5
187 int librpma_fio_client_init(struct thread_data *td,
188 struct rpma_conn_cfg *cfg)
190 struct librpma_fio_client_data *ccd;
191 struct librpma_fio_options_values *o = td->eo;
192 struct ibv_context *dev = NULL;
193 char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
194 struct rpma_conn_req *req = NULL;
195 enum rpma_conn_event event;
196 struct rpma_conn_private_data pdata;
197 enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
198 int remote_flush_type;
202 /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
204 if ((1UL << FD_NET) & fio_debug)
205 log_level_aux = RPMA_LOG_LEVEL_INFO;
208 /* configure logging thresholds to see more details */
209 rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
210 rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
212 /* obtain an IBV context for a remote IP address */
213 if ((ret = rpma_utils_get_ibv_context(o->server_ip,
214 RPMA_UTIL_IBV_CONTEXT_REMOTE, &dev))) {
215 librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
219 /* allocate client's data */
220 ccd = calloc(1, sizeof(*ccd));
222 td_verror(td, errno, "calloc");
226 /* allocate all in-memory queues */
227 ccd->io_us_queued = calloc(td->o.iodepth, sizeof(*ccd->io_us_queued));
228 if (ccd->io_us_queued == NULL) {
229 td_verror(td, errno, "calloc");
233 ccd->io_us_flight = calloc(td->o.iodepth, sizeof(*ccd->io_us_flight));
234 if (ccd->io_us_flight == NULL) {
235 td_verror(td, errno, "calloc");
236 goto err_free_io_u_queues;
239 ccd->io_us_completed = calloc(td->o.iodepth,
240 sizeof(*ccd->io_us_completed));
241 if (ccd->io_us_completed == NULL) {
242 td_verror(td, errno, "calloc");
243 goto err_free_io_u_queues;
246 /* create a new peer object */
247 if ((ret = rpma_peer_new(dev, &ccd->peer))) {
248 librpma_td_verror(td, ret, "rpma_peer_new");
249 goto err_free_io_u_queues;
252 /* create a connection request */
253 if (librpma_fio_td_port(o->port, td, port_td))
254 goto err_peer_delete;
256 for (retry = 0; retry < LIBRPMA_FIO_RETRY_MAX_NO; retry++) {
257 if ((ret = rpma_conn_req_new(ccd->peer, o->server_ip, port_td,
259 librpma_td_verror(td, ret, "rpma_conn_req_new");
260 goto err_peer_delete;
264 * Connect the connection request
265 * and obtain the connection object.
267 if ((ret = rpma_conn_req_connect(&req, NULL, &ccd->conn))) {
268 librpma_td_verror(td, ret, "rpma_conn_req_connect");
272 /* wait for the connection to establish */
273 if ((ret = rpma_conn_next_event(ccd->conn, &event))) {
274 librpma_td_verror(td, ret, "rpma_conn_next_event");
275 goto err_conn_delete;
276 } else if (event == RPMA_CONN_ESTABLISHED) {
278 } else if (event == RPMA_CONN_REJECTED) {
279 (void) rpma_conn_disconnect(ccd->conn);
280 (void) rpma_conn_delete(&ccd->conn);
281 if (retry < LIBRPMA_FIO_RETRY_MAX_NO - 1) {
282 log_err("Thread [%d]: Retrying (#%i) ...\n",
283 td->thread_number, retry + 1);
284 sleep(LIBRPMA_FIO_RETRY_DELAY_S);
287 "Thread [%d]: The maximum number of retries exceeded. Closing.\n",
292 "rpma_conn_next_event returned an unexptected event: (%s != RPMA_CONN_ESTABLISHED)\n",
293 rpma_utils_conn_event_2str(event));
294 goto err_conn_delete;
299 log_err("Thread [%d]: Connected after retry #%i\n",
300 td->thread_number, retry);
302 if (ccd->conn == NULL)
303 goto err_peer_delete;
305 /* get the connection's private data sent from the server */
306 if ((ret = rpma_conn_get_private_data(ccd->conn, &pdata))) {
307 librpma_td_verror(td, ret, "rpma_conn_get_private_data");
308 goto err_conn_delete;
311 /* get the server's workspace representation */
314 /* create the server's memory representation */
315 if ((ret = rpma_mr_remote_from_descriptor(&ccd->ws->descriptor[0],
316 ccd->ws->mr_desc_size, &ccd->server_mr))) {
317 librpma_td_verror(td, ret, "rpma_mr_remote_from_descriptor");
318 goto err_conn_delete;
321 /* get the total size of the shared server memory */
322 if ((ret = rpma_mr_remote_get_size(ccd->server_mr, &ccd->ws_size))) {
323 librpma_td_verror(td, ret, "rpma_mr_remote_get_size");
324 goto err_conn_delete;
327 /* get flush type of the remote node */
328 if ((ret = rpma_mr_remote_get_flush_type(ccd->server_mr,
329 &remote_flush_type))) {
330 librpma_td_verror(td, ret, "rpma_mr_remote_get_flush_type");
331 goto err_conn_delete;
334 ccd->server_mr_flush_type =
335 (remote_flush_type & RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT) ?
336 RPMA_FLUSH_TYPE_PERSISTENT : RPMA_FLUSH_TYPE_VISIBILITY;
339 * Assure an io_us buffer allocation is page-size-aligned which is required
340 * to register for RDMA. User-provided value is intentionally ignored.
342 td->o.mem_align = page_size;
344 td->io_ops_data = ccd;
349 (void) rpma_conn_disconnect(ccd->conn);
350 (void) rpma_conn_delete(&ccd->conn);
353 (void) rpma_conn_req_delete(&req);
356 (void) rpma_peer_delete(&ccd->peer);
358 err_free_io_u_queues:
359 free(ccd->io_us_queued);
360 free(ccd->io_us_flight);
361 free(ccd->io_us_completed);
369 void librpma_fio_client_cleanup(struct thread_data *td)
371 struct librpma_fio_client_data *ccd = td->io_ops_data;
372 enum rpma_conn_event ev;
378 /* delete the iou's memory registration */
379 if ((ret = rpma_mr_dereg(&ccd->orig_mr)))
380 librpma_td_verror(td, ret, "rpma_mr_dereg");
381 /* delete the iou's memory registration */
382 if ((ret = rpma_mr_remote_delete(&ccd->server_mr)))
383 librpma_td_verror(td, ret, "rpma_mr_remote_delete");
384 /* initiate disconnection */
385 if ((ret = rpma_conn_disconnect(ccd->conn)))
386 librpma_td_verror(td, ret, "rpma_conn_disconnect");
387 /* wait for disconnection to end up */
388 if ((ret = rpma_conn_next_event(ccd->conn, &ev))) {
389 librpma_td_verror(td, ret, "rpma_conn_next_event");
390 } else if (ev != RPMA_CONN_CLOSED) {
392 "client_cleanup received an unexpected event (%s != RPMA_CONN_CLOSED)\n",
393 rpma_utils_conn_event_2str(ev));
395 /* delete the connection */
396 if ((ret = rpma_conn_delete(&ccd->conn)))
397 librpma_td_verror(td, ret, "rpma_conn_delete");
398 /* delete the peer */
399 if ((ret = rpma_peer_delete(&ccd->peer)))
400 librpma_td_verror(td, ret, "rpma_peer_delete");
401 /* free the software queues */
402 free(ccd->io_us_queued);
403 free(ccd->io_us_flight);
404 free(ccd->io_us_completed);
406 td->io_ops_data = NULL; /* zero ccd */
409 int librpma_fio_file_nop(struct thread_data *td, struct fio_file *f)
415 int librpma_fio_client_post_init(struct thread_data *td)
417 struct librpma_fio_client_data *ccd = td->io_ops_data;
422 * td->orig_buffer is not aligned. The engine requires aligned io_us
423 * so FIO alignes up the address using the formula below.
425 ccd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
429 * td->orig_buffer_size beside the space really consumed by io_us
430 * has paddings which can be omitted for the memory registration.
432 io_us_size = (unsigned long long)td_max_bs(td) *
433 (unsigned long long)td->o.iodepth;
435 if ((ret = rpma_mr_reg(ccd->peer, ccd->orig_buffer_aligned, io_us_size,
436 RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
437 RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
438 RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT, &ccd->orig_mr)))
439 librpma_td_verror(td, ret, "rpma_mr_reg");
443 int librpma_fio_client_get_file_size(struct thread_data *td,
446 struct librpma_fio_client_data *ccd = td->io_ops_data;
448 f->real_file_size = ccd->ws_size;
449 fio_file_set_size_known(f);
454 static enum fio_q_status client_queue_sync(struct thread_data *td,
457 struct librpma_fio_client_data *ccd = td->io_ops_data;
458 struct rpma_completion cmpl;
463 if (io_u->ddir == DDIR_READ) {
464 /* post an RDMA read operation */
465 if (librpma_fio_client_io_read(td, io_u,
466 RPMA_F_COMPLETION_ALWAYS))
468 } else if (io_u->ddir == DDIR_WRITE) {
469 /* post an RDMA write operation */
470 if (librpma_fio_client_io_write(td, io_u))
472 if (ccd->flush(td, io_u, io_u, io_u->xfer_buflen))
475 log_err("unsupported IO mode: %s\n", io_ddir_name(io_u->ddir));
480 /* get a completion */
481 ret = rpma_conn_completion_get(ccd->conn, &cmpl);
482 if (ret == RPMA_E_NO_COMPLETION) {
483 /* lack of completion is not an error */
485 } else if (ret != 0) {
486 /* an error occurred */
487 librpma_td_verror(td, ret, "rpma_conn_completion_get");
491 /* if io_us has completed with an error */
492 if (cmpl.op_status != IBV_WC_SUCCESS)
495 if (cmpl.op == RPMA_OP_SEND)
496 ++ccd->op_send_completed;
498 if (cmpl.op == RPMA_OP_RECV)
499 ++ccd->op_recv_completed;
505 if (ccd->get_io_u_index(&cmpl, &io_u_index) != 1)
508 if (io_u->index != io_u_index) {
510 "no matching io_u for received completion found (io_u_index=%u)\n",
515 /* make sure all SENDs are completed before exit - clean up SQ */
516 if (librpma_fio_client_io_complete_all_sends(td))
519 return FIO_Q_COMPLETED;
523 return FIO_Q_COMPLETED;
526 enum fio_q_status librpma_fio_client_queue(struct thread_data *td,
529 struct librpma_fio_client_data *ccd = td->io_ops_data;
531 if (ccd->io_u_queued_nr == (int)td->o.iodepth)
535 return client_queue_sync(td, io_u);
537 /* io_u -> queued[] */
538 ccd->io_us_queued[ccd->io_u_queued_nr] = io_u;
539 ccd->io_u_queued_nr++;
544 int librpma_fio_client_commit(struct thread_data *td)
546 struct librpma_fio_client_data *ccd = td->io_ops_data;
547 int flags = RPMA_F_COMPLETION_ON_ERROR;
551 struct io_u *flush_first_io_u = NULL;
552 unsigned long long int flush_len = 0;
554 if (!ccd->io_us_queued)
557 /* execute all io_us from queued[] */
558 for (i = 0; i < ccd->io_u_queued_nr; i++) {
559 struct io_u *io_u = ccd->io_us_queued[i];
561 if (io_u->ddir == DDIR_READ) {
562 if (i + 1 == ccd->io_u_queued_nr ||
563 ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE)
564 flags = RPMA_F_COMPLETION_ALWAYS;
565 /* post an RDMA read operation */
566 if (librpma_fio_client_io_read(td, io_u, flags))
568 } else if (io_u->ddir == DDIR_WRITE) {
569 /* post an RDMA write operation */
570 if (librpma_fio_client_io_write(td, io_u))
573 /* cache the first io_u in the sequence */
574 if (flush_first_io_u == NULL)
575 flush_first_io_u = io_u;
578 * the flush length is the sum of all io_u's creating
581 flush_len += io_u->xfer_buflen;
584 * if io_u's are random the rpma_flush is required
585 * after each one of them
587 if (!td_random(td)) {
589 * When the io_u's are sequential and
590 * the current io_u is not the last one and
591 * the next one is also a write operation
592 * the flush can be postponed by one io_u and
593 * cover all of them which build a continuous
596 if ((i + 1 < ccd->io_u_queued_nr) &&
597 (ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE))
601 /* flush all writes which build a continuous sequence */
602 if (ccd->flush(td, flush_first_io_u, io_u, flush_len))
606 * reset the flush parameters in preparation for
609 flush_first_io_u = NULL;
612 log_err("unsupported IO mode: %s\n",
613 io_ddir_name(io_u->ddir));
618 if ((fill_time = fio_fill_issue_time(td)))
619 fio_gettime(&now, NULL);
621 /* move executed io_us from queued[] to flight[] */
622 for (i = 0; i < ccd->io_u_queued_nr; i++) {
623 struct io_u *io_u = ccd->io_us_queued[i];
625 /* FIO does not do this if the engine is asynchronous */
627 memcpy(&io_u->issue_time, &now, sizeof(now));
629 /* move executed io_us from queued[] to flight[] */
630 ccd->io_us_flight[ccd->io_u_flight_nr] = io_u;
631 ccd->io_u_flight_nr++;
635 * If an engine has the commit hook
636 * it has to call io_u_queued() itself.
638 io_u_queued(td, io_u);
641 /* FIO does not do this if an engine has the commit hook. */
642 io_u_mark_submit(td, ccd->io_u_queued_nr);
643 ccd->io_u_queued_nr = 0;
650 * - > 0 - a number of completed io_us
651 * - 0 - when no complicitions received
652 * - (-1) - when an error occurred
654 static int client_getevent_process(struct thread_data *td)
656 struct librpma_fio_client_data *ccd = td->io_ops_data;
657 struct rpma_completion cmpl;
658 /* io_u->index of completed io_u (cmpl.op_context) */
659 unsigned int io_u_index;
660 /* # of completed io_us */
667 /* get a completion */
668 if ((ret = rpma_conn_completion_get(ccd->conn, &cmpl))) {
669 /* lack of completion is not an error */
670 if (ret == RPMA_E_NO_COMPLETION) {
671 /* lack of completion is not an error */
675 /* an error occurred */
676 librpma_td_verror(td, ret, "rpma_conn_completion_get");
680 /* if io_us has completed with an error */
681 if (cmpl.op_status != IBV_WC_SUCCESS) {
682 td->error = cmpl.op_status;
686 if (cmpl.op == RPMA_OP_SEND)
687 ++ccd->op_send_completed;
688 else if (cmpl.op == RPMA_OP_RECV)
689 ++ccd->op_recv_completed;
691 if ((ret = ccd->get_io_u_index(&cmpl, &io_u_index)) != 1)
694 /* look for an io_u being completed */
695 for (i = 0; i < ccd->io_u_flight_nr; ++i) {
696 if (ccd->io_us_flight[i]->index == io_u_index) {
702 /* if no matching io_u has been found */
705 "no matching io_u for received completion found (io_u_index=%u)\n",
710 /* move completed io_us to the completed in-memory queue */
711 for (i = 0; i < cmpl_num; ++i) {
712 /* get and prepare io_u */
713 io_u = ccd->io_us_flight[i];
715 /* append to the queue */
716 ccd->io_us_completed[ccd->io_u_completed_nr] = io_u;
717 ccd->io_u_completed_nr++;
720 /* remove completed io_us from the flight queue */
721 for (i = cmpl_num; i < ccd->io_u_flight_nr; ++i)
722 ccd->io_us_flight[i - cmpl_num] = ccd->io_us_flight[i];
723 ccd->io_u_flight_nr -= cmpl_num;
728 int librpma_fio_client_getevents(struct thread_data *td, unsigned int min,
729 unsigned int max, const struct timespec *t)
731 struct librpma_fio_client_data *ccd = td->io_ops_data;
732 /* total # of completed io_us */
733 int cmpl_num_total = 0;
734 /* # of completed io_us from a single event */
738 cmpl_num = client_getevent_process(td);
740 /* new completions collected */
741 cmpl_num_total += cmpl_num;
742 } else if (cmpl_num == 0) {
744 * It is required to make sure that CQEs for SENDs
745 * will flow at least at the same pace as CQEs for RECVs.
747 if (cmpl_num_total >= min &&
748 ccd->op_send_completed >= ccd->op_recv_completed)
752 * To reduce CPU consumption one can use
753 * the rpma_conn_completion_wait() function.
754 * Note this greatly increase the latency
755 * and make the results less stable.
756 * The bandwidth stays more or less the same.
759 /* an error occurred */
764 * The expected max can be exceeded if CQEs for RECVs will come up
765 * faster than CQEs for SENDs. But it is required to make sure CQEs for
766 * SENDs will flow at least at the same pace as CQEs for RECVs.
768 } while (cmpl_num_total < max ||
769 ccd->op_send_completed < ccd->op_recv_completed);
772 * All posted SENDs are completed and RECVs for them (responses) are
773 * completed. This is the initial situation so the counters are reset.
775 if (ccd->op_send_posted == ccd->op_send_completed &&
776 ccd->op_send_completed == ccd->op_recv_completed) {
777 ccd->op_send_posted = 0;
778 ccd->op_send_completed = 0;
779 ccd->op_recv_completed = 0;
782 return cmpl_num_total;
785 struct io_u *librpma_fio_client_event(struct thread_data *td, int event)
787 struct librpma_fio_client_data *ccd = td->io_ops_data;
791 /* get the first io_u from the queue */
792 io_u = ccd->io_us_completed[0];
794 /* remove the first io_u from the queue */
795 for (i = 1; i < ccd->io_u_completed_nr; ++i)
796 ccd->io_us_completed[i - 1] = ccd->io_us_completed[i];
797 ccd->io_u_completed_nr--;
799 dprint_io_u(io_u, "client_event");
804 char *librpma_fio_client_errdetails(struct io_u *io_u)
806 /* get the string representation of an error */
807 enum ibv_wc_status status = io_u->error;
808 const char *status_str = ibv_wc_status_str(status);
810 char *details = strdup(status_str);
811 if (details == NULL) {
812 fprintf(stderr, "Error: %s\n", status_str);
813 fprintf(stderr, "Fatal error: out of memory. Aborting.\n");
817 /* FIO frees the returned string when it becomes obsolete */
821 int librpma_fio_server_init(struct thread_data *td)
823 struct librpma_fio_options_values *o = td->eo;
824 struct librpma_fio_server_data *csd;
825 struct ibv_context *dev = NULL;
826 enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
829 /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
831 if ((1UL << FD_NET) & fio_debug)
832 log_level_aux = RPMA_LOG_LEVEL_INFO;
835 /* configure logging thresholds to see more details */
836 rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
837 rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
840 /* obtain an IBV context for a remote IP address */
841 if ((ret = rpma_utils_get_ibv_context(o->server_ip,
842 RPMA_UTIL_IBV_CONTEXT_LOCAL, &dev))) {
843 librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
847 /* allocate server's data */
848 csd = calloc(1, sizeof(*csd));
850 td_verror(td, errno, "calloc");
854 /* create a new peer object */
855 if ((ret = rpma_peer_new(dev, &csd->peer))) {
856 librpma_td_verror(td, ret, "rpma_peer_new");
860 td->io_ops_data = csd;
870 void librpma_fio_server_cleanup(struct thread_data *td)
872 struct librpma_fio_server_data *csd = td->io_ops_data;
879 if ((ret = rpma_peer_delete(&csd->peer)))
880 librpma_td_verror(td, ret, "rpma_peer_delete");
885 int librpma_fio_server_open_file(struct thread_data *td, struct fio_file *f,
886 struct rpma_conn_cfg *cfg)
888 struct librpma_fio_server_data *csd = td->io_ops_data;
889 struct librpma_fio_options_values *o = td->eo;
890 enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
891 struct librpma_fio_workspace ws = {0};
892 struct rpma_conn_private_data pdata;
893 uint32_t max_msg_num;
894 struct rpma_conn_req *conn_req;
895 struct rpma_conn *conn;
896 struct rpma_mr_local *mr;
897 char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
899 size_t mem_size = td->o.size;
907 log_err("fio: filename is not set\n");
911 /* start a listening endpoint at addr:port */
912 if (librpma_fio_td_port(o->port, td, port_td))
915 if ((ret = rpma_ep_listen(csd->peer, o->server_ip, port_td, &ep))) {
916 librpma_td_verror(td, ret, "rpma_ep_listen");
920 is_dram = !strcmp(f->file_name, "malloc");
922 /* allocation from DRAM using posix_memalign() */
923 ws_ptr = librpma_fio_allocate_dram(td, mem_size, &csd->mem);
924 usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_VISIBILITY;
926 /* allocation from PMEM using pmem_map_file() */
927 ws_ptr = librpma_fio_allocate_pmem(td, f, mem_size, &csd->mem);
928 usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT;
932 goto err_ep_shutdown;
934 f->real_file_size = mem_size;
936 if ((ret = rpma_mr_reg(csd->peer, ws_ptr, mem_size,
937 RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
938 RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
939 usage_mem_type, &mr))) {
940 librpma_td_verror(td, ret, "rpma_mr_reg");
944 if (!is_dram && f->filetype == FIO_TYPE_FILE) {
945 ret = rpma_mr_advise(mr, 0, mem_size,
946 IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE,
947 IBV_ADVISE_MR_FLAG_FLUSH);
949 librpma_td_verror(td, ret, "rpma_mr_advise");
950 /* an invalid argument is an error */
951 if (ret == RPMA_E_INVAL)
954 /* log_err used instead of log_info to avoid corruption of the JSON output */
955 log_err("Note: having rpma_mr_advise(3) failed because of RPMA_E_NOSUPP or RPMA_E_PROVIDER may come with a performance penalty, but it is not a blocker for running the benchmark.\n");
959 /* get size of the memory region's descriptor */
960 if ((ret = rpma_mr_get_descriptor_size(mr, &mr_desc_size))) {
961 librpma_td_verror(td, ret, "rpma_mr_get_descriptor_size");
965 /* verify size of the memory region's descriptor */
966 if (mr_desc_size > LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE) {
968 "size of the memory region's descriptor is too big (max=%i)\n",
969 LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE);
973 /* get the memory region's descriptor */
974 if ((ret = rpma_mr_get_descriptor(mr, &ws.descriptor[0]))) {
975 librpma_td_verror(td, ret, "rpma_mr_get_descriptor");
980 if ((ret = rpma_conn_cfg_get_rq_size(cfg, &max_msg_num))) {
981 librpma_td_verror(td, ret, "rpma_conn_cfg_get_rq_size");
985 /* verify whether iodepth fits into uint16_t */
986 if (max_msg_num > UINT16_MAX) {
987 log_err("fio: iodepth too big (%u > %u)\n",
988 max_msg_num, UINT16_MAX);
992 ws.max_msg_num = max_msg_num;
995 /* prepare a workspace description */
996 ws.direct_write_to_pmem = o->direct_write_to_pmem;
997 ws.mr_desc_size = mr_desc_size;
999 pdata.len = sizeof(ws);
1001 /* receive an incoming connection request */
1002 if ((ret = rpma_ep_next_conn_req(ep, cfg, &conn_req))) {
1003 librpma_td_verror(td, ret, "rpma_ep_next_conn_req");
1007 if (csd->prepare_connection && csd->prepare_connection(td, conn_req))
1008 goto err_req_delete;
1010 /* accept the connection request and obtain the connection object */
1011 if ((ret = rpma_conn_req_connect(&conn_req, &pdata, &conn))) {
1012 librpma_td_verror(td, ret, "rpma_conn_req_connect");
1013 goto err_req_delete;
1016 /* wait for the connection to be established */
1017 if ((ret = rpma_conn_next_event(conn, &conn_event))) {
1018 librpma_td_verror(td, ret, "rpma_conn_next_event");
1019 goto err_conn_delete;
1020 } else if (conn_event != RPMA_CONN_ESTABLISHED) {
1021 log_err("rpma_conn_next_event returned an unexptected event\n");
1022 goto err_conn_delete;
1025 /* end-point is no longer needed */
1026 (void) rpma_ep_shutdown(&ep);
1029 csd->ws_ptr = ws_ptr;
1035 (void) rpma_conn_delete(&conn);
1038 (void) rpma_conn_req_delete(&conn_req);
1041 (void) rpma_mr_dereg(&mr);
1044 librpma_fio_free(&csd->mem);
1047 (void) rpma_ep_shutdown(&ep);
1052 int librpma_fio_server_close_file(struct thread_data *td, struct fio_file *f)
1054 struct librpma_fio_server_data *csd = td->io_ops_data;
1055 enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
1059 /* wait for the connection to be closed */
1060 ret = rpma_conn_next_event(csd->conn, &conn_event);
1061 if (!ret && conn_event != RPMA_CONN_CLOSED) {
1062 log_err("rpma_conn_next_event returned an unexptected event\n");
1066 if ((ret = rpma_conn_disconnect(csd->conn))) {
1067 librpma_td_verror(td, ret, "rpma_conn_disconnect");
1071 if ((ret = rpma_conn_delete(&csd->conn))) {
1072 librpma_td_verror(td, ret, "rpma_conn_delete");
1076 if ((ret = rpma_mr_dereg(&csd->ws_mr))) {
1077 librpma_td_verror(td, ret, "rpma_mr_dereg");
1081 librpma_fio_free(&csd->mem);