Merge branch 'proposed_fix' of https://github.com/weberc-ntap/fio
[fio.git] / engines / librpma_fio.c
CommitLineData
e4c4625f
JM
1/*
2 * librpma_fio: librpma_apm and librpma_gpspm engines' common part.
3 *
4 * Copyright 2021, Intel Corporation
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License,
8 * version 2 as published by the Free Software Foundation..
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 */
15
16#include "librpma_fio.h"
17
18#include <libpmem.h>
19
20struct fio_option librpma_fio_options[] = {
21 {
22 .name = "serverip",
23 .lname = "rpma_server_ip",
24 .type = FIO_OPT_STR_STORE,
25 .off1 = offsetof(struct librpma_fio_options_values, server_ip),
26 .help = "IP address the server is listening on",
27 .def = "",
28 .category = FIO_OPT_C_ENGINE,
29 .group = FIO_OPT_G_LIBRPMA,
30 },
31 {
32 .name = "port",
33 .lname = "rpma_server port",
34 .type = FIO_OPT_STR_STORE,
35 .off1 = offsetof(struct librpma_fio_options_values, port),
36 .help = "port the server is listening on",
37 .def = "7204",
38 .category = FIO_OPT_C_ENGINE,
39 .group = FIO_OPT_G_LIBRPMA,
40 },
41 {
42 .name = "direct_write_to_pmem",
43 .lname = "Direct Write to PMem (via RDMA) from the remote host is possible",
44 .type = FIO_OPT_BOOL,
45 .off1 = offsetof(struct librpma_fio_options_values,
46 direct_write_to_pmem),
47 .help = "Set to true ONLY when Direct Write to PMem from the remote host is possible (https://pmem.io/rpma/documentation/basic-direct-write-to-pmem.html)",
48 .def = "",
49 .category = FIO_OPT_C_ENGINE,
50 .group = FIO_OPT_G_LIBRPMA,
51 },
6a229978
OS
52 {
53 .name = "busy_wait_polling",
54 .lname = "Set to 0 to wait for completion instead of busy-wait polling completion.",
55 .type = FIO_OPT_BOOL,
56 .off1 = offsetof(struct librpma_fio_options_values,
57 busy_wait_polling),
58 .help = "Set to false if you want to reduce CPU usage",
59 .def = "1",
60 .category = FIO_OPT_C_ENGINE,
61 .group = FIO_OPT_G_LIBRPMA,
62 },
e4c4625f
JM
63 {
64 .name = NULL,
65 },
66};
67
68int librpma_fio_td_port(const char *port_base_str, struct thread_data *td,
69 char *port_out)
70{
71 unsigned long int port_ul = strtoul(port_base_str, NULL, 10);
72 unsigned int port_new;
73
74 port_out[0] = '\0';
75
76 if (port_ul == ULONG_MAX) {
77 td_verror(td, errno, "strtoul");
78 return -1;
79 }
80 port_ul += td->thread_number - 1;
81 if (port_ul >= UINT_MAX) {
82 log_err("[%u] port number (%lu) bigger than UINT_MAX\n",
83 td->thread_number, port_ul);
84 return -1;
85 }
86
87 port_new = port_ul;
88 snprintf(port_out, LIBRPMA_FIO_PORT_STR_LEN_MAX - 1, "%u", port_new);
89
90 return 0;
91}
92
93char *librpma_fio_allocate_dram(struct thread_data *td, size_t size,
94 struct librpma_fio_mem *mem)
95{
96 char *mem_ptr = NULL;
97 int ret;
98
99 if ((ret = posix_memalign((void **)&mem_ptr, page_size, size))) {
100 log_err("fio: posix_memalign() failed\n");
101 td_verror(td, ret, "posix_memalign");
102 return NULL;
103 }
104
105 mem->mem_ptr = mem_ptr;
106 mem->size_mmap = 0;
107
108 return mem_ptr;
109}
110
a6becc33 111char *librpma_fio_allocate_pmem(struct thread_data *td, struct fio_file *f,
e4c4625f
JM
112 size_t size, struct librpma_fio_mem *mem)
113{
114 size_t size_mmap = 0;
115 char *mem_ptr = NULL;
116 int is_pmem = 0;
117 size_t ws_offset;
118
119 if (size % page_size) {
120 log_err("fio: size (%zu) is not aligned to page size (%zu)\n",
121 size, page_size);
122 return NULL;
123 }
124
a6becc33
WL
125 if (f->filetype == FIO_TYPE_CHAR) {
126 /* Each thread uses a separate offset within DeviceDAX. */
127 ws_offset = (td->thread_number - 1) * size;
128 } else {
129 /* Each thread uses a separate FileSystemDAX file. No offset is needed. */
130 ws_offset = 0;
131 }
e4c4625f 132
a6becc33 133 if (!f->file_name) {
e4c4625f
JM
134 log_err("fio: filename is not set\n");
135 return NULL;
136 }
137
138 /* map the file */
a6becc33 139 mem_ptr = pmem_map_file(f->file_name, 0 /* len */, 0 /* flags */,
e4c4625f
JM
140 0 /* mode */, &size_mmap, &is_pmem);
141 if (mem_ptr == NULL) {
a6becc33 142 log_err("fio: pmem_map_file(%s) failed\n", f->file_name);
e4c4625f
JM
143 /* pmem_map_file() sets errno on failure */
144 td_verror(td, errno, "pmem_map_file");
145 return NULL;
146 }
147
148 /* pmem is expected */
149 if (!is_pmem) {
150 log_err("fio: %s is not located in persistent memory\n",
a6becc33 151 f->file_name);
e4c4625f
JM
152 goto err_unmap;
153 }
154
155 /* check size of allocated persistent memory */
156 if (size_mmap < ws_offset + size) {
157 log_err(
158 "fio: %s is too small to handle so many threads (%zu < %zu)\n",
a6becc33 159 f->file_name, size_mmap, ws_offset + size);
e4c4625f
JM
160 goto err_unmap;
161 }
162
163 log_info("fio: size of memory mapped from the file %s: %zu\n",
a6becc33 164 f->file_name, size_mmap);
e4c4625f
JM
165
166 mem->mem_ptr = mem_ptr;
167 mem->size_mmap = size_mmap;
168
169 return mem_ptr + ws_offset;
170
171err_unmap:
172 (void) pmem_unmap(mem_ptr, size_mmap);
173 return NULL;
174}
175
176void librpma_fio_free(struct librpma_fio_mem *mem)
177{
178 if (mem->size_mmap)
179 (void) pmem_unmap(mem->mem_ptr, mem->size_mmap);
180 else
181 free(mem->mem_ptr);
182}
183
184#define LIBRPMA_FIO_RETRY_MAX_NO 10
185#define LIBRPMA_FIO_RETRY_DELAY_S 5
186
187int librpma_fio_client_init(struct thread_data *td,
188 struct rpma_conn_cfg *cfg)
189{
190 struct librpma_fio_client_data *ccd;
191 struct librpma_fio_options_values *o = td->eo;
192 struct ibv_context *dev = NULL;
193 char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
194 struct rpma_conn_req *req = NULL;
195 enum rpma_conn_event event;
196 struct rpma_conn_private_data pdata;
197 enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
198 int remote_flush_type;
199 int retry;
200 int ret;
201
202 /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
203#ifdef FIO_INC_DEBUG
204 if ((1UL << FD_NET) & fio_debug)
205 log_level_aux = RPMA_LOG_LEVEL_INFO;
206#endif
207
208 /* configure logging thresholds to see more details */
209 rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
210 rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
211
212 /* obtain an IBV context for a remote IP address */
213 if ((ret = rpma_utils_get_ibv_context(o->server_ip,
214 RPMA_UTIL_IBV_CONTEXT_REMOTE, &dev))) {
215 librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
216 return -1;
217 }
218
219 /* allocate client's data */
220 ccd = calloc(1, sizeof(*ccd));
221 if (ccd == NULL) {
222 td_verror(td, errno, "calloc");
223 return -1;
224 }
225
226 /* allocate all in-memory queues */
227 ccd->io_us_queued = calloc(td->o.iodepth, sizeof(*ccd->io_us_queued));
228 if (ccd->io_us_queued == NULL) {
229 td_verror(td, errno, "calloc");
230 goto err_free_ccd;
231 }
232
233 ccd->io_us_flight = calloc(td->o.iodepth, sizeof(*ccd->io_us_flight));
234 if (ccd->io_us_flight == NULL) {
235 td_verror(td, errno, "calloc");
236 goto err_free_io_u_queues;
237 }
238
239 ccd->io_us_completed = calloc(td->o.iodepth,
240 sizeof(*ccd->io_us_completed));
241 if (ccd->io_us_completed == NULL) {
242 td_verror(td, errno, "calloc");
243 goto err_free_io_u_queues;
244 }
245
246 /* create a new peer object */
247 if ((ret = rpma_peer_new(dev, &ccd->peer))) {
248 librpma_td_verror(td, ret, "rpma_peer_new");
249 goto err_free_io_u_queues;
250 }
251
252 /* create a connection request */
253 if (librpma_fio_td_port(o->port, td, port_td))
254 goto err_peer_delete;
255
256 for (retry = 0; retry < LIBRPMA_FIO_RETRY_MAX_NO; retry++) {
257 if ((ret = rpma_conn_req_new(ccd->peer, o->server_ip, port_td,
258 cfg, &req))) {
259 librpma_td_verror(td, ret, "rpma_conn_req_new");
260 goto err_peer_delete;
261 }
262
263 /*
264 * Connect the connection request
265 * and obtain the connection object.
266 */
267 if ((ret = rpma_conn_req_connect(&req, NULL, &ccd->conn))) {
268 librpma_td_verror(td, ret, "rpma_conn_req_connect");
269 goto err_req_delete;
270 }
271
272 /* wait for the connection to establish */
273 if ((ret = rpma_conn_next_event(ccd->conn, &event))) {
274 librpma_td_verror(td, ret, "rpma_conn_next_event");
275 goto err_conn_delete;
276 } else if (event == RPMA_CONN_ESTABLISHED) {
277 break;
278 } else if (event == RPMA_CONN_REJECTED) {
279 (void) rpma_conn_disconnect(ccd->conn);
280 (void) rpma_conn_delete(&ccd->conn);
281 if (retry < LIBRPMA_FIO_RETRY_MAX_NO - 1) {
282 log_err("Thread [%d]: Retrying (#%i) ...\n",
283 td->thread_number, retry + 1);
284 sleep(LIBRPMA_FIO_RETRY_DELAY_S);
285 } else {
286 log_err(
287 "Thread [%d]: The maximum number of retries exceeded. Closing.\n",
288 td->thread_number);
289 }
290 } else {
291 log_err(
292 "rpma_conn_next_event returned an unexptected event: (%s != RPMA_CONN_ESTABLISHED)\n",
293 rpma_utils_conn_event_2str(event));
294 goto err_conn_delete;
295 }
296 }
297
298 if (retry > 0)
299 log_err("Thread [%d]: Connected after retry #%i\n",
300 td->thread_number, retry);
301
302 if (ccd->conn == NULL)
303 goto err_peer_delete;
304
4ef7dd21
OS
305 /* get the connection's main CQ */
306 if ((ret = rpma_conn_get_cq(ccd->conn, &ccd->cq))) {
307 librpma_td_verror(td, ret, "rpma_conn_get_cq");
308 goto err_conn_delete;
309 }
310
e4c4625f
JM
311 /* get the connection's private data sent from the server */
312 if ((ret = rpma_conn_get_private_data(ccd->conn, &pdata))) {
313 librpma_td_verror(td, ret, "rpma_conn_get_private_data");
314 goto err_conn_delete;
315 }
316
317 /* get the server's workspace representation */
318 ccd->ws = pdata.ptr;
319
320 /* create the server's memory representation */
321 if ((ret = rpma_mr_remote_from_descriptor(&ccd->ws->descriptor[0],
322 ccd->ws->mr_desc_size, &ccd->server_mr))) {
323 librpma_td_verror(td, ret, "rpma_mr_remote_from_descriptor");
324 goto err_conn_delete;
325 }
326
327 /* get the total size of the shared server memory */
328 if ((ret = rpma_mr_remote_get_size(ccd->server_mr, &ccd->ws_size))) {
329 librpma_td_verror(td, ret, "rpma_mr_remote_get_size");
330 goto err_conn_delete;
331 }
332
333 /* get flush type of the remote node */
334 if ((ret = rpma_mr_remote_get_flush_type(ccd->server_mr,
335 &remote_flush_type))) {
336 librpma_td_verror(td, ret, "rpma_mr_remote_get_flush_type");
337 goto err_conn_delete;
338 }
339
340 ccd->server_mr_flush_type =
341 (remote_flush_type & RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT) ?
342 RPMA_FLUSH_TYPE_PERSISTENT : RPMA_FLUSH_TYPE_VISIBILITY;
343
344 /*
345 * Assure an io_us buffer allocation is page-size-aligned which is required
346 * to register for RDMA. User-provided value is intentionally ignored.
347 */
348 td->o.mem_align = page_size;
349
350 td->io_ops_data = ccd;
351
352 return 0;
353
354err_conn_delete:
355 (void) rpma_conn_disconnect(ccd->conn);
356 (void) rpma_conn_delete(&ccd->conn);
357
358err_req_delete:
359 (void) rpma_conn_req_delete(&req);
360
361err_peer_delete:
362 (void) rpma_peer_delete(&ccd->peer);
363
364err_free_io_u_queues:
365 free(ccd->io_us_queued);
366 free(ccd->io_us_flight);
367 free(ccd->io_us_completed);
368
369err_free_ccd:
370 free(ccd);
371
372 return -1;
373}
374
375void librpma_fio_client_cleanup(struct thread_data *td)
376{
377 struct librpma_fio_client_data *ccd = td->io_ops_data;
378 enum rpma_conn_event ev;
379 int ret;
380
381 if (ccd == NULL)
382 return;
383
384 /* delete the iou's memory registration */
385 if ((ret = rpma_mr_dereg(&ccd->orig_mr)))
386 librpma_td_verror(td, ret, "rpma_mr_dereg");
387 /* delete the iou's memory registration */
388 if ((ret = rpma_mr_remote_delete(&ccd->server_mr)))
389 librpma_td_verror(td, ret, "rpma_mr_remote_delete");
390 /* initiate disconnection */
391 if ((ret = rpma_conn_disconnect(ccd->conn)))
392 librpma_td_verror(td, ret, "rpma_conn_disconnect");
393 /* wait for disconnection to end up */
394 if ((ret = rpma_conn_next_event(ccd->conn, &ev))) {
395 librpma_td_verror(td, ret, "rpma_conn_next_event");
396 } else if (ev != RPMA_CONN_CLOSED) {
397 log_err(
398 "client_cleanup received an unexpected event (%s != RPMA_CONN_CLOSED)\n",
399 rpma_utils_conn_event_2str(ev));
400 }
401 /* delete the connection */
402 if ((ret = rpma_conn_delete(&ccd->conn)))
403 librpma_td_verror(td, ret, "rpma_conn_delete");
404 /* delete the peer */
405 if ((ret = rpma_peer_delete(&ccd->peer)))
406 librpma_td_verror(td, ret, "rpma_peer_delete");
407 /* free the software queues */
408 free(ccd->io_us_queued);
409 free(ccd->io_us_flight);
410 free(ccd->io_us_completed);
411 free(ccd);
412 td->io_ops_data = NULL; /* zero ccd */
413}
414
415int librpma_fio_file_nop(struct thread_data *td, struct fio_file *f)
416{
417 /* NOP */
418 return 0;
419}
420
421int librpma_fio_client_post_init(struct thread_data *td)
422{
423 struct librpma_fio_client_data *ccd = td->io_ops_data;
424 size_t io_us_size;
425 int ret;
426
427 /*
428 * td->orig_buffer is not aligned. The engine requires aligned io_us
fc002f14 429 * so FIO aligns up the address using the formula below.
e4c4625f
JM
430 */
431 ccd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
432 td->o.mem_align;
433
434 /*
435 * td->orig_buffer_size beside the space really consumed by io_us
436 * has paddings which can be omitted for the memory registration.
437 */
438 io_us_size = (unsigned long long)td_max_bs(td) *
439 (unsigned long long)td->o.iodepth;
440
441 if ((ret = rpma_mr_reg(ccd->peer, ccd->orig_buffer_aligned, io_us_size,
442 RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
443 RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
444 RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT, &ccd->orig_mr)))
445 librpma_td_verror(td, ret, "rpma_mr_reg");
446 return ret;
447}
448
449int librpma_fio_client_get_file_size(struct thread_data *td,
450 struct fio_file *f)
451{
452 struct librpma_fio_client_data *ccd = td->io_ops_data;
453
454 f->real_file_size = ccd->ws_size;
455 fio_file_set_size_known(f);
456
457 return 0;
458}
459
460static enum fio_q_status client_queue_sync(struct thread_data *td,
461 struct io_u *io_u)
462{
463 struct librpma_fio_client_data *ccd = td->io_ops_data;
4ef7dd21 464 struct ibv_wc wc;
e4c4625f
JM
465 unsigned io_u_index;
466 int ret;
467
468 /* execute io_u */
469 if (io_u->ddir == DDIR_READ) {
470 /* post an RDMA read operation */
471 if (librpma_fio_client_io_read(td, io_u,
472 RPMA_F_COMPLETION_ALWAYS))
473 goto err;
474 } else if (io_u->ddir == DDIR_WRITE) {
475 /* post an RDMA write operation */
476 if (librpma_fio_client_io_write(td, io_u))
477 goto err;
478 if (ccd->flush(td, io_u, io_u, io_u->xfer_buflen))
479 goto err;
480 } else {
481 log_err("unsupported IO mode: %s\n", io_ddir_name(io_u->ddir));
482 goto err;
483 }
484
485 do {
486 /* get a completion */
4ef7dd21 487 ret = rpma_cq_get_wc(ccd->cq, 1, &wc, NULL);
e4c4625f
JM
488 if (ret == RPMA_E_NO_COMPLETION) {
489 /* lack of completion is not an error */
490 continue;
491 } else if (ret != 0) {
492 /* an error occurred */
4ef7dd21 493 librpma_td_verror(td, ret, "rpma_cq_get_wc");
e4c4625f
JM
494 goto err;
495 }
496
497 /* if io_us has completed with an error */
4ef7dd21 498 if (wc.status != IBV_WC_SUCCESS)
e4c4625f
JM
499 goto err;
500
4ef7dd21 501 if (wc.opcode == IBV_WC_SEND)
e4c4625f
JM
502 ++ccd->op_send_completed;
503 else {
4ef7dd21 504 if (wc.opcode == IBV_WC_RECV)
e4c4625f
JM
505 ++ccd->op_recv_completed;
506
507 break;
508 }
509 } while (1);
510
4ef7dd21 511 if (ccd->get_io_u_index(&wc, &io_u_index) != 1)
e4c4625f
JM
512 goto err;
513
514 if (io_u->index != io_u_index) {
515 log_err(
516 "no matching io_u for received completion found (io_u_index=%u)\n",
517 io_u_index);
518 goto err;
519 }
520
521 /* make sure all SENDs are completed before exit - clean up SQ */
522 if (librpma_fio_client_io_complete_all_sends(td))
523 goto err;
524
525 return FIO_Q_COMPLETED;
526
527err:
528 io_u->error = -1;
529 return FIO_Q_COMPLETED;
530}
531
532enum fio_q_status librpma_fio_client_queue(struct thread_data *td,
533 struct io_u *io_u)
534{
535 struct librpma_fio_client_data *ccd = td->io_ops_data;
536
537 if (ccd->io_u_queued_nr == (int)td->o.iodepth)
538 return FIO_Q_BUSY;
539
540 if (td->o.sync_io)
541 return client_queue_sync(td, io_u);
542
543 /* io_u -> queued[] */
544 ccd->io_us_queued[ccd->io_u_queued_nr] = io_u;
545 ccd->io_u_queued_nr++;
546
547 return FIO_Q_QUEUED;
548}
549
550int librpma_fio_client_commit(struct thread_data *td)
551{
552 struct librpma_fio_client_data *ccd = td->io_ops_data;
553 int flags = RPMA_F_COMPLETION_ON_ERROR;
554 struct timespec now;
555 bool fill_time;
556 int i;
557 struct io_u *flush_first_io_u = NULL;
558 unsigned long long int flush_len = 0;
559
560 if (!ccd->io_us_queued)
561 return -1;
562
563 /* execute all io_us from queued[] */
564 for (i = 0; i < ccd->io_u_queued_nr; i++) {
565 struct io_u *io_u = ccd->io_us_queued[i];
566
567 if (io_u->ddir == DDIR_READ) {
568 if (i + 1 == ccd->io_u_queued_nr ||
569 ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE)
570 flags = RPMA_F_COMPLETION_ALWAYS;
571 /* post an RDMA read operation */
572 if (librpma_fio_client_io_read(td, io_u, flags))
573 return -1;
574 } else if (io_u->ddir == DDIR_WRITE) {
575 /* post an RDMA write operation */
576 if (librpma_fio_client_io_write(td, io_u))
577 return -1;
578
579 /* cache the first io_u in the sequence */
580 if (flush_first_io_u == NULL)
581 flush_first_io_u = io_u;
582
583 /*
584 * the flush length is the sum of all io_u's creating
585 * the sequence
586 */
587 flush_len += io_u->xfer_buflen;
588
589 /*
590 * if io_u's are random the rpma_flush is required
591 * after each one of them
592 */
593 if (!td_random(td)) {
594 /*
595 * When the io_u's are sequential and
596 * the current io_u is not the last one and
597 * the next one is also a write operation
598 * the flush can be postponed by one io_u and
599 * cover all of them which build a continuous
600 * sequence.
601 */
602 if ((i + 1 < ccd->io_u_queued_nr) &&
603 (ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE))
604 continue;
605 }
606
607 /* flush all writes which build a continuous sequence */
608 if (ccd->flush(td, flush_first_io_u, io_u, flush_len))
609 return -1;
610
611 /*
612 * reset the flush parameters in preparation for
613 * the next one
614 */
615 flush_first_io_u = NULL;
616 flush_len = 0;
617 } else {
618 log_err("unsupported IO mode: %s\n",
619 io_ddir_name(io_u->ddir));
620 return -1;
621 }
622 }
623
2b82135e 624 if ((fill_time = fio_fill_issue_time(td))) {
e4c4625f
JM
625 fio_gettime(&now, NULL);
626
2b82135e
VF
627 /*
628 * only used for iolog
629 */
630 if (td->o.read_iolog_file)
631 memcpy(&td->last_issue, &now, sizeof(now));
632
633 }
e4c4625f
JM
634 /* move executed io_us from queued[] to flight[] */
635 for (i = 0; i < ccd->io_u_queued_nr; i++) {
636 struct io_u *io_u = ccd->io_us_queued[i];
637
638 /* FIO does not do this if the engine is asynchronous */
639 if (fill_time)
640 memcpy(&io_u->issue_time, &now, sizeof(now));
641
642 /* move executed io_us from queued[] to flight[] */
643 ccd->io_us_flight[ccd->io_u_flight_nr] = io_u;
644 ccd->io_u_flight_nr++;
645
646 /*
647 * FIO says:
648 * If an engine has the commit hook
649 * it has to call io_u_queued() itself.
650 */
651 io_u_queued(td, io_u);
652 }
653
654 /* FIO does not do this if an engine has the commit hook. */
655 io_u_mark_submit(td, ccd->io_u_queued_nr);
656 ccd->io_u_queued_nr = 0;
657
658 return 0;
659}
660
661/*
662 * RETURN VALUE
663 * - > 0 - a number of completed io_us
664 * - 0 - when no complicitions received
665 * - (-1) - when an error occurred
666 */
667static int client_getevent_process(struct thread_data *td)
668{
669 struct librpma_fio_client_data *ccd = td->io_ops_data;
4ef7dd21
OS
670 struct ibv_wc wc;
671 /* io_u->index of completed io_u (wc.wr_id) */
e4c4625f
JM
672 unsigned int io_u_index;
673 /* # of completed io_us */
674 int cmpl_num = 0;
675 /* helpers */
676 struct io_u *io_u;
677 int i;
678 int ret;
679
680 /* get a completion */
4ef7dd21 681 if ((ret = rpma_cq_get_wc(ccd->cq, 1, &wc, NULL))) {
e4c4625f
JM
682 /* lack of completion is not an error */
683 if (ret == RPMA_E_NO_COMPLETION) {
684 /* lack of completion is not an error */
685 return 0;
686 }
687
688 /* an error occurred */
4ef7dd21 689 librpma_td_verror(td, ret, "rpma_cq_get_wc");
e4c4625f
JM
690 return -1;
691 }
692
693 /* if io_us has completed with an error */
4ef7dd21
OS
694 if (wc.status != IBV_WC_SUCCESS) {
695 td->error = wc.status;
e4c4625f
JM
696 return -1;
697 }
698
4ef7dd21 699 if (wc.opcode == IBV_WC_SEND)
e4c4625f 700 ++ccd->op_send_completed;
4ef7dd21 701 else if (wc.opcode == IBV_WC_RECV)
e4c4625f
JM
702 ++ccd->op_recv_completed;
703
4ef7dd21 704 if ((ret = ccd->get_io_u_index(&wc, &io_u_index)) != 1)
e4c4625f
JM
705 return ret;
706
707 /* look for an io_u being completed */
708 for (i = 0; i < ccd->io_u_flight_nr; ++i) {
709 if (ccd->io_us_flight[i]->index == io_u_index) {
710 cmpl_num = i + 1;
711 break;
712 }
713 }
714
715 /* if no matching io_u has been found */
716 if (cmpl_num == 0) {
717 log_err(
718 "no matching io_u for received completion found (io_u_index=%u)\n",
719 io_u_index);
720 return -1;
721 }
722
723 /* move completed io_us to the completed in-memory queue */
724 for (i = 0; i < cmpl_num; ++i) {
725 /* get and prepare io_u */
726 io_u = ccd->io_us_flight[i];
727
728 /* append to the queue */
729 ccd->io_us_completed[ccd->io_u_completed_nr] = io_u;
730 ccd->io_u_completed_nr++;
731 }
732
733 /* remove completed io_us from the flight queue */
734 for (i = cmpl_num; i < ccd->io_u_flight_nr; ++i)
735 ccd->io_us_flight[i - cmpl_num] = ccd->io_us_flight[i];
736 ccd->io_u_flight_nr -= cmpl_num;
737
738 return cmpl_num;
739}
740
741int librpma_fio_client_getevents(struct thread_data *td, unsigned int min,
742 unsigned int max, const struct timespec *t)
743{
744 struct librpma_fio_client_data *ccd = td->io_ops_data;
745 /* total # of completed io_us */
746 int cmpl_num_total = 0;
747 /* # of completed io_us from a single event */
748 int cmpl_num;
749
750 do {
751 cmpl_num = client_getevent_process(td);
752 if (cmpl_num > 0) {
753 /* new completions collected */
754 cmpl_num_total += cmpl_num;
755 } else if (cmpl_num == 0) {
756 /*
757 * It is required to make sure that CQEs for SENDs
758 * will flow at least at the same pace as CQEs for RECVs.
759 */
760 if (cmpl_num_total >= min &&
761 ccd->op_send_completed >= ccd->op_recv_completed)
762 break;
763
764 /*
765 * To reduce CPU consumption one can use
4ef7dd21 766 * the rpma_cq_wait() function.
e4c4625f
JM
767 * Note this greatly increase the latency
768 * and make the results less stable.
769 * The bandwidth stays more or less the same.
770 */
771 } else {
772 /* an error occurred */
773 return -1;
774 }
775
776 /*
777 * The expected max can be exceeded if CQEs for RECVs will come up
778 * faster than CQEs for SENDs. But it is required to make sure CQEs for
779 * SENDs will flow at least at the same pace as CQEs for RECVs.
780 */
781 } while (cmpl_num_total < max ||
782 ccd->op_send_completed < ccd->op_recv_completed);
783
784 /*
785 * All posted SENDs are completed and RECVs for them (responses) are
786 * completed. This is the initial situation so the counters are reset.
787 */
788 if (ccd->op_send_posted == ccd->op_send_completed &&
789 ccd->op_send_completed == ccd->op_recv_completed) {
790 ccd->op_send_posted = 0;
791 ccd->op_send_completed = 0;
792 ccd->op_recv_completed = 0;
793 }
794
795 return cmpl_num_total;
796}
797
798struct io_u *librpma_fio_client_event(struct thread_data *td, int event)
799{
800 struct librpma_fio_client_data *ccd = td->io_ops_data;
801 struct io_u *io_u;
802 int i;
803
804 /* get the first io_u from the queue */
805 io_u = ccd->io_us_completed[0];
806
807 /* remove the first io_u from the queue */
808 for (i = 1; i < ccd->io_u_completed_nr; ++i)
809 ccd->io_us_completed[i - 1] = ccd->io_us_completed[i];
810 ccd->io_u_completed_nr--;
811
812 dprint_io_u(io_u, "client_event");
813
814 return io_u;
815}
816
817char *librpma_fio_client_errdetails(struct io_u *io_u)
818{
819 /* get the string representation of an error */
820 enum ibv_wc_status status = io_u->error;
821 const char *status_str = ibv_wc_status_str(status);
822
823 char *details = strdup(status_str);
824 if (details == NULL) {
825 fprintf(stderr, "Error: %s\n", status_str);
826 fprintf(stderr, "Fatal error: out of memory. Aborting.\n");
827 abort();
828 }
829
830 /* FIO frees the returned string when it becomes obsolete */
831 return details;
832}
833
834int librpma_fio_server_init(struct thread_data *td)
835{
836 struct librpma_fio_options_values *o = td->eo;
837 struct librpma_fio_server_data *csd;
838 struct ibv_context *dev = NULL;
839 enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
840 int ret = -1;
841
842 /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
843#ifdef FIO_INC_DEBUG
844 if ((1UL << FD_NET) & fio_debug)
845 log_level_aux = RPMA_LOG_LEVEL_INFO;
846#endif
847
848 /* configure logging thresholds to see more details */
849 rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
850 rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
851
852
853 /* obtain an IBV context for a remote IP address */
854 if ((ret = rpma_utils_get_ibv_context(o->server_ip,
855 RPMA_UTIL_IBV_CONTEXT_LOCAL, &dev))) {
856 librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
857 return -1;
858 }
859
860 /* allocate server's data */
861 csd = calloc(1, sizeof(*csd));
862 if (csd == NULL) {
863 td_verror(td, errno, "calloc");
864 return -1;
865 }
866
867 /* create a new peer object */
868 if ((ret = rpma_peer_new(dev, &csd->peer))) {
869 librpma_td_verror(td, ret, "rpma_peer_new");
870 goto err_free_csd;
871 }
872
873 td->io_ops_data = csd;
874
875 return 0;
876
877err_free_csd:
878 free(csd);
879
880 return -1;
881}
882
883void librpma_fio_server_cleanup(struct thread_data *td)
884{
885 struct librpma_fio_server_data *csd = td->io_ops_data;
886 int ret;
887
888 if (csd == NULL)
889 return;
890
891 /* free the peer */
892 if ((ret = rpma_peer_delete(&csd->peer)))
893 librpma_td_verror(td, ret, "rpma_peer_delete");
894
895 free(csd);
896}
897
898int librpma_fio_server_open_file(struct thread_data *td, struct fio_file *f,
899 struct rpma_conn_cfg *cfg)
900{
901 struct librpma_fio_server_data *csd = td->io_ops_data;
902 struct librpma_fio_options_values *o = td->eo;
903 enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
904 struct librpma_fio_workspace ws = {0};
905 struct rpma_conn_private_data pdata;
906 uint32_t max_msg_num;
907 struct rpma_conn_req *conn_req;
908 struct rpma_conn *conn;
909 struct rpma_mr_local *mr;
910 char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
911 struct rpma_ep *ep;
912 size_t mem_size = td->o.size;
913 size_t mr_desc_size;
914 void *ws_ptr;
a6becc33 915 bool is_dram;
e4c4625f
JM
916 int usage_mem_type;
917 int ret;
918
919 if (!f->file_name) {
920 log_err("fio: filename is not set\n");
921 return -1;
922 }
923
924 /* start a listening endpoint at addr:port */
925 if (librpma_fio_td_port(o->port, td, port_td))
926 return -1;
927
928 if ((ret = rpma_ep_listen(csd->peer, o->server_ip, port_td, &ep))) {
929 librpma_td_verror(td, ret, "rpma_ep_listen");
930 return -1;
931 }
932
a6becc33
WL
933 is_dram = !strcmp(f->file_name, "malloc");
934 if (is_dram) {
e4c4625f
JM
935 /* allocation from DRAM using posix_memalign() */
936 ws_ptr = librpma_fio_allocate_dram(td, mem_size, &csd->mem);
937 usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_VISIBILITY;
938 } else {
939 /* allocation from PMEM using pmem_map_file() */
a6becc33 940 ws_ptr = librpma_fio_allocate_pmem(td, f, mem_size, &csd->mem);
e4c4625f
JM
941 usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT;
942 }
943
944 if (ws_ptr == NULL)
945 goto err_ep_shutdown;
946
947 f->real_file_size = mem_size;
948
949 if ((ret = rpma_mr_reg(csd->peer, ws_ptr, mem_size,
950 RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
951 RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
952 usage_mem_type, &mr))) {
953 librpma_td_verror(td, ret, "rpma_mr_reg");
954 goto err_free;
955 }
956
a6becc33
WL
957 if (!is_dram && f->filetype == FIO_TYPE_FILE) {
958 ret = rpma_mr_advise(mr, 0, mem_size,
959 IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE,
960 IBV_ADVISE_MR_FLAG_FLUSH);
961 if (ret) {
962 librpma_td_verror(td, ret, "rpma_mr_advise");
963 /* an invalid argument is an error */
964 if (ret == RPMA_E_INVAL)
965 goto err_mr_dereg;
966
967 /* log_err used instead of log_info to avoid corruption of the JSON output */
968 log_err("Note: having rpma_mr_advise(3) failed because of RPMA_E_NOSUPP or RPMA_E_PROVIDER may come with a performance penalty, but it is not a blocker for running the benchmark.\n");
969 }
970 }
971
e4c4625f
JM
972 /* get size of the memory region's descriptor */
973 if ((ret = rpma_mr_get_descriptor_size(mr, &mr_desc_size))) {
974 librpma_td_verror(td, ret, "rpma_mr_get_descriptor_size");
975 goto err_mr_dereg;
976 }
977
978 /* verify size of the memory region's descriptor */
979 if (mr_desc_size > LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE) {
980 log_err(
981 "size of the memory region's descriptor is too big (max=%i)\n",
982 LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE);
983 goto err_mr_dereg;
984 }
985
986 /* get the memory region's descriptor */
987 if ((ret = rpma_mr_get_descriptor(mr, &ws.descriptor[0]))) {
988 librpma_td_verror(td, ret, "rpma_mr_get_descriptor");
989 goto err_mr_dereg;
990 }
991
992 if (cfg != NULL) {
993 if ((ret = rpma_conn_cfg_get_rq_size(cfg, &max_msg_num))) {
994 librpma_td_verror(td, ret, "rpma_conn_cfg_get_rq_size");
995 goto err_mr_dereg;
996 }
997
998 /* verify whether iodepth fits into uint16_t */
999 if (max_msg_num > UINT16_MAX) {
1000 log_err("fio: iodepth too big (%u > %u)\n",
1001 max_msg_num, UINT16_MAX);
1002 return -1;
1003 }
1004
1005 ws.max_msg_num = max_msg_num;
1006 }
1007
1008 /* prepare a workspace description */
1009 ws.direct_write_to_pmem = o->direct_write_to_pmem;
1010 ws.mr_desc_size = mr_desc_size;
1011 pdata.ptr = &ws;
1012 pdata.len = sizeof(ws);
1013
1014 /* receive an incoming connection request */
1015 if ((ret = rpma_ep_next_conn_req(ep, cfg, &conn_req))) {
1016 librpma_td_verror(td, ret, "rpma_ep_next_conn_req");
1017 goto err_mr_dereg;
1018 }
1019
1020 if (csd->prepare_connection && csd->prepare_connection(td, conn_req))
1021 goto err_req_delete;
1022
1023 /* accept the connection request and obtain the connection object */
1024 if ((ret = rpma_conn_req_connect(&conn_req, &pdata, &conn))) {
1025 librpma_td_verror(td, ret, "rpma_conn_req_connect");
1026 goto err_req_delete;
1027 }
1028
1029 /* wait for the connection to be established */
1030 if ((ret = rpma_conn_next_event(conn, &conn_event))) {
1031 librpma_td_verror(td, ret, "rpma_conn_next_event");
1032 goto err_conn_delete;
1033 } else if (conn_event != RPMA_CONN_ESTABLISHED) {
1034 log_err("rpma_conn_next_event returned an unexptected event\n");
1035 goto err_conn_delete;
1036 }
1037
1038 /* end-point is no longer needed */
1039 (void) rpma_ep_shutdown(&ep);
1040
1041 csd->ws_mr = mr;
1042 csd->ws_ptr = ws_ptr;
1043 csd->conn = conn;
1044
4ef7dd21
OS
1045 /* get the connection's main CQ */
1046 if ((ret = rpma_conn_get_cq(csd->conn, &csd->cq))) {
1047 librpma_td_verror(td, ret, "rpma_conn_get_cq");
1048 goto err_conn_delete;
1049 }
1050
e4c4625f
JM
1051 return 0;
1052
1053err_conn_delete:
1054 (void) rpma_conn_delete(&conn);
1055
1056err_req_delete:
1057 (void) rpma_conn_req_delete(&conn_req);
1058
1059err_mr_dereg:
1060 (void) rpma_mr_dereg(&mr);
1061
1062err_free:
1063 librpma_fio_free(&csd->mem);
1064
1065err_ep_shutdown:
1066 (void) rpma_ep_shutdown(&ep);
1067
1068 return -1;
1069}
1070
1071int librpma_fio_server_close_file(struct thread_data *td, struct fio_file *f)
1072{
1073 struct librpma_fio_server_data *csd = td->io_ops_data;
1074 enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
1075 int rv = 0;
1076 int ret;
1077
1078 /* wait for the connection to be closed */
1079 ret = rpma_conn_next_event(csd->conn, &conn_event);
1080 if (!ret && conn_event != RPMA_CONN_CLOSED) {
1081 log_err("rpma_conn_next_event returned an unexptected event\n");
1082 rv = -1;
1083 }
1084
1085 if ((ret = rpma_conn_disconnect(csd->conn))) {
1086 librpma_td_verror(td, ret, "rpma_conn_disconnect");
1087 rv = -1;
1088 }
1089
1090 if ((ret = rpma_conn_delete(&csd->conn))) {
1091 librpma_td_verror(td, ret, "rpma_conn_delete");
1092 rv = -1;
1093 }
1094
1095 if ((ret = rpma_mr_dereg(&csd->ws_mr))) {
1096 librpma_td_verror(td, ret, "rpma_mr_dereg");
1097 rv = -1;
1098 }
1099
1100 librpma_fio_free(&csd->mem);
1101
1102 return rv;
1103}