engines/xnvme: user space vfio based backend
[fio.git] / engines / librpma_fio.c
CommitLineData
e4c4625f
JM
1/*
2 * librpma_fio: librpma_apm and librpma_gpspm engines' common part.
3 *
8fabefc1 4 * Copyright 2021-2022, Intel Corporation
e4c4625f
JM
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License,
8 * version 2 as published by the Free Software Foundation..
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 */
15
8fabefc1
KS
16#ifdef CONFIG_LIBPMEM2_INSTALLED
17#include "librpma_fio_pmem2.h"
18#else
19#include "librpma_fio_pmem.h"
20#endif /* CONFIG_LIBPMEM2_INSTALLED */
e4c4625f
JM
21
22struct fio_option librpma_fio_options[] = {
23 {
24 .name = "serverip",
25 .lname = "rpma_server_ip",
26 .type = FIO_OPT_STR_STORE,
27 .off1 = offsetof(struct librpma_fio_options_values, server_ip),
28 .help = "IP address the server is listening on",
29 .def = "",
30 .category = FIO_OPT_C_ENGINE,
31 .group = FIO_OPT_G_LIBRPMA,
32 },
33 {
34 .name = "port",
35 .lname = "rpma_server port",
36 .type = FIO_OPT_STR_STORE,
37 .off1 = offsetof(struct librpma_fio_options_values, port),
38 .help = "port the server is listening on",
39 .def = "7204",
40 .category = FIO_OPT_C_ENGINE,
41 .group = FIO_OPT_G_LIBRPMA,
42 },
43 {
44 .name = "direct_write_to_pmem",
45 .lname = "Direct Write to PMem (via RDMA) from the remote host is possible",
46 .type = FIO_OPT_BOOL,
47 .off1 = offsetof(struct librpma_fio_options_values,
48 direct_write_to_pmem),
49 .help = "Set to true ONLY when Direct Write to PMem from the remote host is possible (https://pmem.io/rpma/documentation/basic-direct-write-to-pmem.html)",
50 .def = "",
51 .category = FIO_OPT_C_ENGINE,
52 .group = FIO_OPT_G_LIBRPMA,
53 },
6a229978
OS
54 {
55 .name = "busy_wait_polling",
56 .lname = "Set to 0 to wait for completion instead of busy-wait polling completion.",
57 .type = FIO_OPT_BOOL,
58 .off1 = offsetof(struct librpma_fio_options_values,
59 busy_wait_polling),
60 .help = "Set to false if you want to reduce CPU usage",
61 .def = "1",
62 .category = FIO_OPT_C_ENGINE,
63 .group = FIO_OPT_G_LIBRPMA,
64 },
e4c4625f
JM
65 {
66 .name = NULL,
67 },
68};
69
70int librpma_fio_td_port(const char *port_base_str, struct thread_data *td,
71 char *port_out)
72{
73 unsigned long int port_ul = strtoul(port_base_str, NULL, 10);
74 unsigned int port_new;
75
76 port_out[0] = '\0';
77
78 if (port_ul == ULONG_MAX) {
79 td_verror(td, errno, "strtoul");
80 return -1;
81 }
82 port_ul += td->thread_number - 1;
83 if (port_ul >= UINT_MAX) {
84 log_err("[%u] port number (%lu) bigger than UINT_MAX\n",
85 td->thread_number, port_ul);
86 return -1;
87 }
88
89 port_new = port_ul;
90 snprintf(port_out, LIBRPMA_FIO_PORT_STR_LEN_MAX - 1, "%u", port_new);
91
92 return 0;
93}
94
95char *librpma_fio_allocate_dram(struct thread_data *td, size_t size,
96 struct librpma_fio_mem *mem)
97{
98 char *mem_ptr = NULL;
99 int ret;
100
101 if ((ret = posix_memalign((void **)&mem_ptr, page_size, size))) {
102 log_err("fio: posix_memalign() failed\n");
103 td_verror(td, ret, "posix_memalign");
104 return NULL;
105 }
106
107 mem->mem_ptr = mem_ptr;
108 mem->size_mmap = 0;
109
110 return mem_ptr;
111}
112
a6becc33 113char *librpma_fio_allocate_pmem(struct thread_data *td, struct fio_file *f,
e4c4625f
JM
114 size_t size, struct librpma_fio_mem *mem)
115{
e4c4625f 116 size_t ws_offset;
8fabefc1 117 mem->mem_ptr = NULL;
e4c4625f
JM
118
119 if (size % page_size) {
120 log_err("fio: size (%zu) is not aligned to page size (%zu)\n",
121 size, page_size);
122 return NULL;
123 }
124
a6becc33
WL
125 if (f->filetype == FIO_TYPE_CHAR) {
126 /* Each thread uses a separate offset within DeviceDAX. */
127 ws_offset = (td->thread_number - 1) * size;
128 } else {
129 /* Each thread uses a separate FileSystemDAX file. No offset is needed. */
130 ws_offset = 0;
131 }
e4c4625f 132
a6becc33 133 if (!f->file_name) {
e4c4625f
JM
134 log_err("fio: filename is not set\n");
135 return NULL;
136 }
137
8fabefc1
KS
138 if (librpma_fio_pmem_map_file(f, size, mem, ws_offset)) {
139 log_err("fio: librpma_fio_pmem_map_file(%s) failed\n",
a6becc33 140 f->file_name);
8fabefc1 141 return NULL;
e4c4625f
JM
142 }
143
144 log_info("fio: size of memory mapped from the file %s: %zu\n",
8fabefc1 145 f->file_name, mem->size_mmap);
e4c4625f 146
8fabefc1 147 log_info("fio: library used to map PMem from file: %s\n", RPMA_PMEM_USED);
e4c4625f 148
8fabefc1 149 return mem->mem_ptr ? mem->mem_ptr + ws_offset : NULL;
e4c4625f
JM
150}
151
152void librpma_fio_free(struct librpma_fio_mem *mem)
153{
154 if (mem->size_mmap)
8fabefc1 155 librpma_fio_unmap(mem);
e4c4625f
JM
156 else
157 free(mem->mem_ptr);
158}
159
160#define LIBRPMA_FIO_RETRY_MAX_NO 10
161#define LIBRPMA_FIO_RETRY_DELAY_S 5
162
163int librpma_fio_client_init(struct thread_data *td,
164 struct rpma_conn_cfg *cfg)
165{
166 struct librpma_fio_client_data *ccd;
167 struct librpma_fio_options_values *o = td->eo;
168 struct ibv_context *dev = NULL;
169 char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
170 struct rpma_conn_req *req = NULL;
171 enum rpma_conn_event event;
172 struct rpma_conn_private_data pdata;
173 enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
174 int remote_flush_type;
175 int retry;
176 int ret;
177
178 /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
179#ifdef FIO_INC_DEBUG
180 if ((1UL << FD_NET) & fio_debug)
181 log_level_aux = RPMA_LOG_LEVEL_INFO;
182#endif
183
184 /* configure logging thresholds to see more details */
185 rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
186 rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
187
188 /* obtain an IBV context for a remote IP address */
189 if ((ret = rpma_utils_get_ibv_context(o->server_ip,
190 RPMA_UTIL_IBV_CONTEXT_REMOTE, &dev))) {
191 librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
192 return -1;
193 }
194
195 /* allocate client's data */
196 ccd = calloc(1, sizeof(*ccd));
197 if (ccd == NULL) {
198 td_verror(td, errno, "calloc");
199 return -1;
200 }
201
202 /* allocate all in-memory queues */
203 ccd->io_us_queued = calloc(td->o.iodepth, sizeof(*ccd->io_us_queued));
204 if (ccd->io_us_queued == NULL) {
205 td_verror(td, errno, "calloc");
206 goto err_free_ccd;
207 }
208
209 ccd->io_us_flight = calloc(td->o.iodepth, sizeof(*ccd->io_us_flight));
210 if (ccd->io_us_flight == NULL) {
211 td_verror(td, errno, "calloc");
212 goto err_free_io_u_queues;
213 }
214
215 ccd->io_us_completed = calloc(td->o.iodepth,
216 sizeof(*ccd->io_us_completed));
217 if (ccd->io_us_completed == NULL) {
218 td_verror(td, errno, "calloc");
219 goto err_free_io_u_queues;
220 }
221
222 /* create a new peer object */
223 if ((ret = rpma_peer_new(dev, &ccd->peer))) {
224 librpma_td_verror(td, ret, "rpma_peer_new");
225 goto err_free_io_u_queues;
226 }
227
228 /* create a connection request */
229 if (librpma_fio_td_port(o->port, td, port_td))
230 goto err_peer_delete;
231
232 for (retry = 0; retry < LIBRPMA_FIO_RETRY_MAX_NO; retry++) {
233 if ((ret = rpma_conn_req_new(ccd->peer, o->server_ip, port_td,
234 cfg, &req))) {
235 librpma_td_verror(td, ret, "rpma_conn_req_new");
236 goto err_peer_delete;
237 }
238
239 /*
240 * Connect the connection request
241 * and obtain the connection object.
242 */
243 if ((ret = rpma_conn_req_connect(&req, NULL, &ccd->conn))) {
244 librpma_td_verror(td, ret, "rpma_conn_req_connect");
245 goto err_req_delete;
246 }
247
248 /* wait for the connection to establish */
249 if ((ret = rpma_conn_next_event(ccd->conn, &event))) {
250 librpma_td_verror(td, ret, "rpma_conn_next_event");
251 goto err_conn_delete;
252 } else if (event == RPMA_CONN_ESTABLISHED) {
253 break;
254 } else if (event == RPMA_CONN_REJECTED) {
255 (void) rpma_conn_disconnect(ccd->conn);
256 (void) rpma_conn_delete(&ccd->conn);
257 if (retry < LIBRPMA_FIO_RETRY_MAX_NO - 1) {
258 log_err("Thread [%d]: Retrying (#%i) ...\n",
259 td->thread_number, retry + 1);
260 sleep(LIBRPMA_FIO_RETRY_DELAY_S);
261 } else {
262 log_err(
263 "Thread [%d]: The maximum number of retries exceeded. Closing.\n",
264 td->thread_number);
265 }
266 } else {
267 log_err(
268 "rpma_conn_next_event returned an unexptected event: (%s != RPMA_CONN_ESTABLISHED)\n",
269 rpma_utils_conn_event_2str(event));
270 goto err_conn_delete;
271 }
272 }
273
274 if (retry > 0)
275 log_err("Thread [%d]: Connected after retry #%i\n",
276 td->thread_number, retry);
277
278 if (ccd->conn == NULL)
279 goto err_peer_delete;
280
4ef7dd21
OS
281 /* get the connection's main CQ */
282 if ((ret = rpma_conn_get_cq(ccd->conn, &ccd->cq))) {
283 librpma_td_verror(td, ret, "rpma_conn_get_cq");
284 goto err_conn_delete;
285 }
286
e4c4625f
JM
287 /* get the connection's private data sent from the server */
288 if ((ret = rpma_conn_get_private_data(ccd->conn, &pdata))) {
289 librpma_td_verror(td, ret, "rpma_conn_get_private_data");
290 goto err_conn_delete;
291 }
292
293 /* get the server's workspace representation */
294 ccd->ws = pdata.ptr;
295
296 /* create the server's memory representation */
297 if ((ret = rpma_mr_remote_from_descriptor(&ccd->ws->descriptor[0],
298 ccd->ws->mr_desc_size, &ccd->server_mr))) {
299 librpma_td_verror(td, ret, "rpma_mr_remote_from_descriptor");
300 goto err_conn_delete;
301 }
302
303 /* get the total size of the shared server memory */
304 if ((ret = rpma_mr_remote_get_size(ccd->server_mr, &ccd->ws_size))) {
305 librpma_td_verror(td, ret, "rpma_mr_remote_get_size");
306 goto err_conn_delete;
307 }
308
309 /* get flush type of the remote node */
310 if ((ret = rpma_mr_remote_get_flush_type(ccd->server_mr,
311 &remote_flush_type))) {
312 librpma_td_verror(td, ret, "rpma_mr_remote_get_flush_type");
313 goto err_conn_delete;
314 }
315
316 ccd->server_mr_flush_type =
317 (remote_flush_type & RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT) ?
318 RPMA_FLUSH_TYPE_PERSISTENT : RPMA_FLUSH_TYPE_VISIBILITY;
319
320 /*
321 * Assure an io_us buffer allocation is page-size-aligned which is required
322 * to register for RDMA. User-provided value is intentionally ignored.
323 */
324 td->o.mem_align = page_size;
325
326 td->io_ops_data = ccd;
327
328 return 0;
329
330err_conn_delete:
331 (void) rpma_conn_disconnect(ccd->conn);
332 (void) rpma_conn_delete(&ccd->conn);
333
334err_req_delete:
335 (void) rpma_conn_req_delete(&req);
336
337err_peer_delete:
338 (void) rpma_peer_delete(&ccd->peer);
339
340err_free_io_u_queues:
341 free(ccd->io_us_queued);
342 free(ccd->io_us_flight);
343 free(ccd->io_us_completed);
344
345err_free_ccd:
346 free(ccd);
347
348 return -1;
349}
350
351void librpma_fio_client_cleanup(struct thread_data *td)
352{
353 struct librpma_fio_client_data *ccd = td->io_ops_data;
354 enum rpma_conn_event ev;
355 int ret;
356
357 if (ccd == NULL)
358 return;
359
360 /* delete the iou's memory registration */
361 if ((ret = rpma_mr_dereg(&ccd->orig_mr)))
362 librpma_td_verror(td, ret, "rpma_mr_dereg");
363 /* delete the iou's memory registration */
364 if ((ret = rpma_mr_remote_delete(&ccd->server_mr)))
365 librpma_td_verror(td, ret, "rpma_mr_remote_delete");
366 /* initiate disconnection */
367 if ((ret = rpma_conn_disconnect(ccd->conn)))
368 librpma_td_verror(td, ret, "rpma_conn_disconnect");
369 /* wait for disconnection to end up */
370 if ((ret = rpma_conn_next_event(ccd->conn, &ev))) {
371 librpma_td_verror(td, ret, "rpma_conn_next_event");
372 } else if (ev != RPMA_CONN_CLOSED) {
373 log_err(
374 "client_cleanup received an unexpected event (%s != RPMA_CONN_CLOSED)\n",
375 rpma_utils_conn_event_2str(ev));
376 }
377 /* delete the connection */
378 if ((ret = rpma_conn_delete(&ccd->conn)))
379 librpma_td_verror(td, ret, "rpma_conn_delete");
380 /* delete the peer */
381 if ((ret = rpma_peer_delete(&ccd->peer)))
382 librpma_td_verror(td, ret, "rpma_peer_delete");
383 /* free the software queues */
384 free(ccd->io_us_queued);
385 free(ccd->io_us_flight);
386 free(ccd->io_us_completed);
387 free(ccd);
388 td->io_ops_data = NULL; /* zero ccd */
389}
390
391int librpma_fio_file_nop(struct thread_data *td, struct fio_file *f)
392{
393 /* NOP */
394 return 0;
395}
396
397int librpma_fio_client_post_init(struct thread_data *td)
398{
399 struct librpma_fio_client_data *ccd = td->io_ops_data;
400 size_t io_us_size;
401 int ret;
402
403 /*
404 * td->orig_buffer is not aligned. The engine requires aligned io_us
fc002f14 405 * so FIO aligns up the address using the formula below.
e4c4625f
JM
406 */
407 ccd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
408 td->o.mem_align;
409
410 /*
411 * td->orig_buffer_size beside the space really consumed by io_us
412 * has paddings which can be omitted for the memory registration.
413 */
414 io_us_size = (unsigned long long)td_max_bs(td) *
415 (unsigned long long)td->o.iodepth;
416
417 if ((ret = rpma_mr_reg(ccd->peer, ccd->orig_buffer_aligned, io_us_size,
418 RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
419 RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
420 RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT, &ccd->orig_mr)))
421 librpma_td_verror(td, ret, "rpma_mr_reg");
422 return ret;
423}
424
425int librpma_fio_client_get_file_size(struct thread_data *td,
426 struct fio_file *f)
427{
428 struct librpma_fio_client_data *ccd = td->io_ops_data;
429
430 f->real_file_size = ccd->ws_size;
431 fio_file_set_size_known(f);
432
433 return 0;
434}
435
436static enum fio_q_status client_queue_sync(struct thread_data *td,
437 struct io_u *io_u)
438{
439 struct librpma_fio_client_data *ccd = td->io_ops_data;
4ef7dd21 440 struct ibv_wc wc;
e4c4625f
JM
441 unsigned io_u_index;
442 int ret;
443
444 /* execute io_u */
445 if (io_u->ddir == DDIR_READ) {
446 /* post an RDMA read operation */
447 if (librpma_fio_client_io_read(td, io_u,
448 RPMA_F_COMPLETION_ALWAYS))
449 goto err;
450 } else if (io_u->ddir == DDIR_WRITE) {
451 /* post an RDMA write operation */
452 if (librpma_fio_client_io_write(td, io_u))
453 goto err;
454 if (ccd->flush(td, io_u, io_u, io_u->xfer_buflen))
455 goto err;
456 } else {
457 log_err("unsupported IO mode: %s\n", io_ddir_name(io_u->ddir));
458 goto err;
459 }
460
461 do {
462 /* get a completion */
4ef7dd21 463 ret = rpma_cq_get_wc(ccd->cq, 1, &wc, NULL);
e4c4625f
JM
464 if (ret == RPMA_E_NO_COMPLETION) {
465 /* lack of completion is not an error */
466 continue;
467 } else if (ret != 0) {
468 /* an error occurred */
4ef7dd21 469 librpma_td_verror(td, ret, "rpma_cq_get_wc");
e4c4625f
JM
470 goto err;
471 }
472
473 /* if io_us has completed with an error */
4ef7dd21 474 if (wc.status != IBV_WC_SUCCESS)
e4c4625f
JM
475 goto err;
476
4ef7dd21 477 if (wc.opcode == IBV_WC_SEND)
e4c4625f
JM
478 ++ccd->op_send_completed;
479 else {
4ef7dd21 480 if (wc.opcode == IBV_WC_RECV)
e4c4625f
JM
481 ++ccd->op_recv_completed;
482
483 break;
484 }
485 } while (1);
486
4ef7dd21 487 if (ccd->get_io_u_index(&wc, &io_u_index) != 1)
e4c4625f
JM
488 goto err;
489
490 if (io_u->index != io_u_index) {
491 log_err(
492 "no matching io_u for received completion found (io_u_index=%u)\n",
493 io_u_index);
494 goto err;
495 }
496
497 /* make sure all SENDs are completed before exit - clean up SQ */
498 if (librpma_fio_client_io_complete_all_sends(td))
499 goto err;
500
501 return FIO_Q_COMPLETED;
502
503err:
504 io_u->error = -1;
505 return FIO_Q_COMPLETED;
506}
507
508enum fio_q_status librpma_fio_client_queue(struct thread_data *td,
509 struct io_u *io_u)
510{
511 struct librpma_fio_client_data *ccd = td->io_ops_data;
512
513 if (ccd->io_u_queued_nr == (int)td->o.iodepth)
514 return FIO_Q_BUSY;
515
516 if (td->o.sync_io)
517 return client_queue_sync(td, io_u);
518
519 /* io_u -> queued[] */
520 ccd->io_us_queued[ccd->io_u_queued_nr] = io_u;
521 ccd->io_u_queued_nr++;
522
523 return FIO_Q_QUEUED;
524}
525
526int librpma_fio_client_commit(struct thread_data *td)
527{
528 struct librpma_fio_client_data *ccd = td->io_ops_data;
529 int flags = RPMA_F_COMPLETION_ON_ERROR;
530 struct timespec now;
531 bool fill_time;
532 int i;
533 struct io_u *flush_first_io_u = NULL;
534 unsigned long long int flush_len = 0;
535
536 if (!ccd->io_us_queued)
537 return -1;
538
539 /* execute all io_us from queued[] */
540 for (i = 0; i < ccd->io_u_queued_nr; i++) {
541 struct io_u *io_u = ccd->io_us_queued[i];
542
543 if (io_u->ddir == DDIR_READ) {
544 if (i + 1 == ccd->io_u_queued_nr ||
545 ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE)
546 flags = RPMA_F_COMPLETION_ALWAYS;
547 /* post an RDMA read operation */
548 if (librpma_fio_client_io_read(td, io_u, flags))
549 return -1;
550 } else if (io_u->ddir == DDIR_WRITE) {
551 /* post an RDMA write operation */
552 if (librpma_fio_client_io_write(td, io_u))
553 return -1;
554
555 /* cache the first io_u in the sequence */
556 if (flush_first_io_u == NULL)
557 flush_first_io_u = io_u;
558
559 /*
560 * the flush length is the sum of all io_u's creating
561 * the sequence
562 */
563 flush_len += io_u->xfer_buflen;
564
565 /*
566 * if io_u's are random the rpma_flush is required
567 * after each one of them
568 */
569 if (!td_random(td)) {
570 /*
571 * When the io_u's are sequential and
572 * the current io_u is not the last one and
573 * the next one is also a write operation
574 * the flush can be postponed by one io_u and
575 * cover all of them which build a continuous
576 * sequence.
577 */
578 if ((i + 1 < ccd->io_u_queued_nr) &&
579 (ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE))
580 continue;
581 }
582
583 /* flush all writes which build a continuous sequence */
584 if (ccd->flush(td, flush_first_io_u, io_u, flush_len))
585 return -1;
586
587 /*
588 * reset the flush parameters in preparation for
589 * the next one
590 */
591 flush_first_io_u = NULL;
592 flush_len = 0;
593 } else {
594 log_err("unsupported IO mode: %s\n",
595 io_ddir_name(io_u->ddir));
596 return -1;
597 }
598 }
599
2b82135e 600 if ((fill_time = fio_fill_issue_time(td))) {
e4c4625f
JM
601 fio_gettime(&now, NULL);
602
2b82135e
VF
603 /*
604 * only used for iolog
605 */
606 if (td->o.read_iolog_file)
607 memcpy(&td->last_issue, &now, sizeof(now));
608
609 }
e4c4625f
JM
610 /* move executed io_us from queued[] to flight[] */
611 for (i = 0; i < ccd->io_u_queued_nr; i++) {
612 struct io_u *io_u = ccd->io_us_queued[i];
613
614 /* FIO does not do this if the engine is asynchronous */
615 if (fill_time)
616 memcpy(&io_u->issue_time, &now, sizeof(now));
617
618 /* move executed io_us from queued[] to flight[] */
619 ccd->io_us_flight[ccd->io_u_flight_nr] = io_u;
620 ccd->io_u_flight_nr++;
621
622 /*
623 * FIO says:
624 * If an engine has the commit hook
625 * it has to call io_u_queued() itself.
626 */
627 io_u_queued(td, io_u);
628 }
629
630 /* FIO does not do this if an engine has the commit hook. */
631 io_u_mark_submit(td, ccd->io_u_queued_nr);
632 ccd->io_u_queued_nr = 0;
633
634 return 0;
635}
636
637/*
638 * RETURN VALUE
639 * - > 0 - a number of completed io_us
640 * - 0 - when no complicitions received
641 * - (-1) - when an error occurred
642 */
643static int client_getevent_process(struct thread_data *td)
644{
645 struct librpma_fio_client_data *ccd = td->io_ops_data;
4ef7dd21
OS
646 struct ibv_wc wc;
647 /* io_u->index of completed io_u (wc.wr_id) */
e4c4625f
JM
648 unsigned int io_u_index;
649 /* # of completed io_us */
650 int cmpl_num = 0;
651 /* helpers */
652 struct io_u *io_u;
653 int i;
654 int ret;
655
656 /* get a completion */
4ef7dd21 657 if ((ret = rpma_cq_get_wc(ccd->cq, 1, &wc, NULL))) {
e4c4625f
JM
658 /* lack of completion is not an error */
659 if (ret == RPMA_E_NO_COMPLETION) {
660 /* lack of completion is not an error */
661 return 0;
662 }
663
664 /* an error occurred */
4ef7dd21 665 librpma_td_verror(td, ret, "rpma_cq_get_wc");
e4c4625f
JM
666 return -1;
667 }
668
669 /* if io_us has completed with an error */
4ef7dd21
OS
670 if (wc.status != IBV_WC_SUCCESS) {
671 td->error = wc.status;
e4c4625f
JM
672 return -1;
673 }
674
4ef7dd21 675 if (wc.opcode == IBV_WC_SEND)
e4c4625f 676 ++ccd->op_send_completed;
4ef7dd21 677 else if (wc.opcode == IBV_WC_RECV)
e4c4625f
JM
678 ++ccd->op_recv_completed;
679
4ef7dd21 680 if ((ret = ccd->get_io_u_index(&wc, &io_u_index)) != 1)
e4c4625f
JM
681 return ret;
682
683 /* look for an io_u being completed */
684 for (i = 0; i < ccd->io_u_flight_nr; ++i) {
685 if (ccd->io_us_flight[i]->index == io_u_index) {
686 cmpl_num = i + 1;
687 break;
688 }
689 }
690
691 /* if no matching io_u has been found */
692 if (cmpl_num == 0) {
693 log_err(
694 "no matching io_u for received completion found (io_u_index=%u)\n",
695 io_u_index);
696 return -1;
697 }
698
699 /* move completed io_us to the completed in-memory queue */
700 for (i = 0; i < cmpl_num; ++i) {
701 /* get and prepare io_u */
702 io_u = ccd->io_us_flight[i];
703
704 /* append to the queue */
705 ccd->io_us_completed[ccd->io_u_completed_nr] = io_u;
706 ccd->io_u_completed_nr++;
707 }
708
709 /* remove completed io_us from the flight queue */
710 for (i = cmpl_num; i < ccd->io_u_flight_nr; ++i)
711 ccd->io_us_flight[i - cmpl_num] = ccd->io_us_flight[i];
712 ccd->io_u_flight_nr -= cmpl_num;
713
714 return cmpl_num;
715}
716
717int librpma_fio_client_getevents(struct thread_data *td, unsigned int min,
718 unsigned int max, const struct timespec *t)
719{
720 struct librpma_fio_client_data *ccd = td->io_ops_data;
721 /* total # of completed io_us */
722 int cmpl_num_total = 0;
723 /* # of completed io_us from a single event */
724 int cmpl_num;
725
726 do {
727 cmpl_num = client_getevent_process(td);
728 if (cmpl_num > 0) {
729 /* new completions collected */
730 cmpl_num_total += cmpl_num;
731 } else if (cmpl_num == 0) {
732 /*
733 * It is required to make sure that CQEs for SENDs
734 * will flow at least at the same pace as CQEs for RECVs.
735 */
736 if (cmpl_num_total >= min &&
737 ccd->op_send_completed >= ccd->op_recv_completed)
738 break;
739
740 /*
741 * To reduce CPU consumption one can use
4ef7dd21 742 * the rpma_cq_wait() function.
e4c4625f
JM
743 * Note this greatly increase the latency
744 * and make the results less stable.
745 * The bandwidth stays more or less the same.
746 */
747 } else {
748 /* an error occurred */
749 return -1;
750 }
751
752 /*
753 * The expected max can be exceeded if CQEs for RECVs will come up
754 * faster than CQEs for SENDs. But it is required to make sure CQEs for
755 * SENDs will flow at least at the same pace as CQEs for RECVs.
756 */
757 } while (cmpl_num_total < max ||
758 ccd->op_send_completed < ccd->op_recv_completed);
759
760 /*
761 * All posted SENDs are completed and RECVs for them (responses) are
762 * completed. This is the initial situation so the counters are reset.
763 */
764 if (ccd->op_send_posted == ccd->op_send_completed &&
765 ccd->op_send_completed == ccd->op_recv_completed) {
766 ccd->op_send_posted = 0;
767 ccd->op_send_completed = 0;
768 ccd->op_recv_completed = 0;
769 }
770
771 return cmpl_num_total;
772}
773
774struct io_u *librpma_fio_client_event(struct thread_data *td, int event)
775{
776 struct librpma_fio_client_data *ccd = td->io_ops_data;
777 struct io_u *io_u;
778 int i;
779
780 /* get the first io_u from the queue */
781 io_u = ccd->io_us_completed[0];
782
783 /* remove the first io_u from the queue */
784 for (i = 1; i < ccd->io_u_completed_nr; ++i)
785 ccd->io_us_completed[i - 1] = ccd->io_us_completed[i];
786 ccd->io_u_completed_nr--;
787
788 dprint_io_u(io_u, "client_event");
789
790 return io_u;
791}
792
793char *librpma_fio_client_errdetails(struct io_u *io_u)
794{
795 /* get the string representation of an error */
796 enum ibv_wc_status status = io_u->error;
797 const char *status_str = ibv_wc_status_str(status);
798
799 char *details = strdup(status_str);
800 if (details == NULL) {
801 fprintf(stderr, "Error: %s\n", status_str);
802 fprintf(stderr, "Fatal error: out of memory. Aborting.\n");
803 abort();
804 }
805
806 /* FIO frees the returned string when it becomes obsolete */
807 return details;
808}
809
810int librpma_fio_server_init(struct thread_data *td)
811{
812 struct librpma_fio_options_values *o = td->eo;
813 struct librpma_fio_server_data *csd;
814 struct ibv_context *dev = NULL;
815 enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
816 int ret = -1;
817
818 /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
819#ifdef FIO_INC_DEBUG
820 if ((1UL << FD_NET) & fio_debug)
821 log_level_aux = RPMA_LOG_LEVEL_INFO;
822#endif
823
824 /* configure logging thresholds to see more details */
825 rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
826 rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
827
828
829 /* obtain an IBV context for a remote IP address */
830 if ((ret = rpma_utils_get_ibv_context(o->server_ip,
831 RPMA_UTIL_IBV_CONTEXT_LOCAL, &dev))) {
832 librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
833 return -1;
834 }
835
836 /* allocate server's data */
837 csd = calloc(1, sizeof(*csd));
838 if (csd == NULL) {
839 td_verror(td, errno, "calloc");
840 return -1;
841 }
842
843 /* create a new peer object */
844 if ((ret = rpma_peer_new(dev, &csd->peer))) {
845 librpma_td_verror(td, ret, "rpma_peer_new");
846 goto err_free_csd;
847 }
848
849 td->io_ops_data = csd;
850
851 return 0;
852
853err_free_csd:
854 free(csd);
855
856 return -1;
857}
858
859void librpma_fio_server_cleanup(struct thread_data *td)
860{
861 struct librpma_fio_server_data *csd = td->io_ops_data;
862 int ret;
863
864 if (csd == NULL)
865 return;
866
867 /* free the peer */
868 if ((ret = rpma_peer_delete(&csd->peer)))
869 librpma_td_verror(td, ret, "rpma_peer_delete");
870
871 free(csd);
872}
873
874int librpma_fio_server_open_file(struct thread_data *td, struct fio_file *f,
875 struct rpma_conn_cfg *cfg)
876{
877 struct librpma_fio_server_data *csd = td->io_ops_data;
878 struct librpma_fio_options_values *o = td->eo;
879 enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
880 struct librpma_fio_workspace ws = {0};
881 struct rpma_conn_private_data pdata;
882 uint32_t max_msg_num;
883 struct rpma_conn_req *conn_req;
884 struct rpma_conn *conn;
885 struct rpma_mr_local *mr;
886 char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
887 struct rpma_ep *ep;
888 size_t mem_size = td->o.size;
889 size_t mr_desc_size;
890 void *ws_ptr;
a6becc33 891 bool is_dram;
e4c4625f
JM
892 int usage_mem_type;
893 int ret;
894
895 if (!f->file_name) {
896 log_err("fio: filename is not set\n");
897 return -1;
898 }
899
900 /* start a listening endpoint at addr:port */
901 if (librpma_fio_td_port(o->port, td, port_td))
902 return -1;
903
904 if ((ret = rpma_ep_listen(csd->peer, o->server_ip, port_td, &ep))) {
905 librpma_td_verror(td, ret, "rpma_ep_listen");
906 return -1;
907 }
908
a6becc33
WL
909 is_dram = !strcmp(f->file_name, "malloc");
910 if (is_dram) {
e4c4625f
JM
911 /* allocation from DRAM using posix_memalign() */
912 ws_ptr = librpma_fio_allocate_dram(td, mem_size, &csd->mem);
913 usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_VISIBILITY;
914 } else {
915 /* allocation from PMEM using pmem_map_file() */
a6becc33 916 ws_ptr = librpma_fio_allocate_pmem(td, f, mem_size, &csd->mem);
e4c4625f
JM
917 usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT;
918 }
919
920 if (ws_ptr == NULL)
921 goto err_ep_shutdown;
922
923 f->real_file_size = mem_size;
924
925 if ((ret = rpma_mr_reg(csd->peer, ws_ptr, mem_size,
926 RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
927 RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
928 usage_mem_type, &mr))) {
929 librpma_td_verror(td, ret, "rpma_mr_reg");
930 goto err_free;
931 }
932
a6becc33
WL
933 if (!is_dram && f->filetype == FIO_TYPE_FILE) {
934 ret = rpma_mr_advise(mr, 0, mem_size,
935 IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE,
936 IBV_ADVISE_MR_FLAG_FLUSH);
937 if (ret) {
938 librpma_td_verror(td, ret, "rpma_mr_advise");
939 /* an invalid argument is an error */
940 if (ret == RPMA_E_INVAL)
941 goto err_mr_dereg;
942
943 /* log_err used instead of log_info to avoid corruption of the JSON output */
944 log_err("Note: having rpma_mr_advise(3) failed because of RPMA_E_NOSUPP or RPMA_E_PROVIDER may come with a performance penalty, but it is not a blocker for running the benchmark.\n");
945 }
946 }
947
e4c4625f
JM
948 /* get size of the memory region's descriptor */
949 if ((ret = rpma_mr_get_descriptor_size(mr, &mr_desc_size))) {
950 librpma_td_verror(td, ret, "rpma_mr_get_descriptor_size");
951 goto err_mr_dereg;
952 }
953
954 /* verify size of the memory region's descriptor */
955 if (mr_desc_size > LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE) {
956 log_err(
957 "size of the memory region's descriptor is too big (max=%i)\n",
958 LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE);
959 goto err_mr_dereg;
960 }
961
962 /* get the memory region's descriptor */
963 if ((ret = rpma_mr_get_descriptor(mr, &ws.descriptor[0]))) {
964 librpma_td_verror(td, ret, "rpma_mr_get_descriptor");
965 goto err_mr_dereg;
966 }
967
968 if (cfg != NULL) {
969 if ((ret = rpma_conn_cfg_get_rq_size(cfg, &max_msg_num))) {
970 librpma_td_verror(td, ret, "rpma_conn_cfg_get_rq_size");
971 goto err_mr_dereg;
972 }
973
974 /* verify whether iodepth fits into uint16_t */
975 if (max_msg_num > UINT16_MAX) {
976 log_err("fio: iodepth too big (%u > %u)\n",
977 max_msg_num, UINT16_MAX);
978 return -1;
979 }
980
981 ws.max_msg_num = max_msg_num;
982 }
983
984 /* prepare a workspace description */
985 ws.direct_write_to_pmem = o->direct_write_to_pmem;
986 ws.mr_desc_size = mr_desc_size;
987 pdata.ptr = &ws;
988 pdata.len = sizeof(ws);
989
990 /* receive an incoming connection request */
991 if ((ret = rpma_ep_next_conn_req(ep, cfg, &conn_req))) {
992 librpma_td_verror(td, ret, "rpma_ep_next_conn_req");
993 goto err_mr_dereg;
994 }
995
996 if (csd->prepare_connection && csd->prepare_connection(td, conn_req))
997 goto err_req_delete;
998
999 /* accept the connection request and obtain the connection object */
1000 if ((ret = rpma_conn_req_connect(&conn_req, &pdata, &conn))) {
1001 librpma_td_verror(td, ret, "rpma_conn_req_connect");
1002 goto err_req_delete;
1003 }
1004
1005 /* wait for the connection to be established */
1006 if ((ret = rpma_conn_next_event(conn, &conn_event))) {
1007 librpma_td_verror(td, ret, "rpma_conn_next_event");
1008 goto err_conn_delete;
1009 } else if (conn_event != RPMA_CONN_ESTABLISHED) {
1010 log_err("rpma_conn_next_event returned an unexptected event\n");
1011 goto err_conn_delete;
1012 }
1013
1014 /* end-point is no longer needed */
1015 (void) rpma_ep_shutdown(&ep);
1016
1017 csd->ws_mr = mr;
1018 csd->ws_ptr = ws_ptr;
1019 csd->conn = conn;
1020
4ef7dd21
OS
1021 /* get the connection's main CQ */
1022 if ((ret = rpma_conn_get_cq(csd->conn, &csd->cq))) {
1023 librpma_td_verror(td, ret, "rpma_conn_get_cq");
1024 goto err_conn_delete;
1025 }
1026
e4c4625f
JM
1027 return 0;
1028
1029err_conn_delete:
1030 (void) rpma_conn_delete(&conn);
1031
1032err_req_delete:
1033 (void) rpma_conn_req_delete(&conn_req);
1034
1035err_mr_dereg:
1036 (void) rpma_mr_dereg(&mr);
1037
1038err_free:
1039 librpma_fio_free(&csd->mem);
1040
1041err_ep_shutdown:
1042 (void) rpma_ep_shutdown(&ep);
1043
1044 return -1;
1045}
1046
1047int librpma_fio_server_close_file(struct thread_data *td, struct fio_file *f)
1048{
1049 struct librpma_fio_server_data *csd = td->io_ops_data;
1050 enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
1051 int rv = 0;
1052 int ret;
1053
1054 /* wait for the connection to be closed */
1055 ret = rpma_conn_next_event(csd->conn, &conn_event);
1056 if (!ret && conn_event != RPMA_CONN_CLOSED) {
1057 log_err("rpma_conn_next_event returned an unexptected event\n");
1058 rv = -1;
1059 }
1060
1061 if ((ret = rpma_conn_disconnect(csd->conn))) {
1062 librpma_td_verror(td, ret, "rpma_conn_disconnect");
1063 rv = -1;
1064 }
1065
1066 if ((ret = rpma_conn_delete(&csd->conn))) {
1067 librpma_td_verror(td, ret, "rpma_conn_delete");
1068 rv = -1;
1069 }
1070
1071 if ((ret = rpma_mr_dereg(&csd->ws_mr))) {
1072 librpma_td_verror(td, ret, "rpma_mr_dereg");
1073 rv = -1;
1074 }
1075
1076 librpma_fio_free(&csd->mem);
1077
1078 return rv;
1079}