rpma: gpspm: introduce the busy_wait_polling toggle
[fio.git] / engines / librpma_fio.c
CommitLineData
e4c4625f
JM
1/*
2 * librpma_fio: librpma_apm and librpma_gpspm engines' common part.
3 *
4 * Copyright 2021, Intel Corporation
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License,
8 * version 2 as published by the Free Software Foundation..
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 */
15
16#include "librpma_fio.h"
17
18#include <libpmem.h>
19
20struct fio_option librpma_fio_options[] = {
21 {
22 .name = "serverip",
23 .lname = "rpma_server_ip",
24 .type = FIO_OPT_STR_STORE,
25 .off1 = offsetof(struct librpma_fio_options_values, server_ip),
26 .help = "IP address the server is listening on",
27 .def = "",
28 .category = FIO_OPT_C_ENGINE,
29 .group = FIO_OPT_G_LIBRPMA,
30 },
31 {
32 .name = "port",
33 .lname = "rpma_server port",
34 .type = FIO_OPT_STR_STORE,
35 .off1 = offsetof(struct librpma_fio_options_values, port),
36 .help = "port the server is listening on",
37 .def = "7204",
38 .category = FIO_OPT_C_ENGINE,
39 .group = FIO_OPT_G_LIBRPMA,
40 },
41 {
42 .name = "direct_write_to_pmem",
43 .lname = "Direct Write to PMem (via RDMA) from the remote host is possible",
44 .type = FIO_OPT_BOOL,
45 .off1 = offsetof(struct librpma_fio_options_values,
46 direct_write_to_pmem),
47 .help = "Set to true ONLY when Direct Write to PMem from the remote host is possible (https://pmem.io/rpma/documentation/basic-direct-write-to-pmem.html)",
48 .def = "",
49 .category = FIO_OPT_C_ENGINE,
50 .group = FIO_OPT_G_LIBRPMA,
51 },
6a229978
OS
52 {
53 .name = "busy_wait_polling",
54 .lname = "Set to 0 to wait for completion instead of busy-wait polling completion.",
55 .type = FIO_OPT_BOOL,
56 .off1 = offsetof(struct librpma_fio_options_values,
57 busy_wait_polling),
58 .help = "Set to false if you want to reduce CPU usage",
59 .def = "1",
60 .category = FIO_OPT_C_ENGINE,
61 .group = FIO_OPT_G_LIBRPMA,
62 },
e4c4625f
JM
63 {
64 .name = NULL,
65 },
66};
67
68int librpma_fio_td_port(const char *port_base_str, struct thread_data *td,
69 char *port_out)
70{
71 unsigned long int port_ul = strtoul(port_base_str, NULL, 10);
72 unsigned int port_new;
73
74 port_out[0] = '\0';
75
76 if (port_ul == ULONG_MAX) {
77 td_verror(td, errno, "strtoul");
78 return -1;
79 }
80 port_ul += td->thread_number - 1;
81 if (port_ul >= UINT_MAX) {
82 log_err("[%u] port number (%lu) bigger than UINT_MAX\n",
83 td->thread_number, port_ul);
84 return -1;
85 }
86
87 port_new = port_ul;
88 snprintf(port_out, LIBRPMA_FIO_PORT_STR_LEN_MAX - 1, "%u", port_new);
89
90 return 0;
91}
92
93char *librpma_fio_allocate_dram(struct thread_data *td, size_t size,
94 struct librpma_fio_mem *mem)
95{
96 char *mem_ptr = NULL;
97 int ret;
98
99 if ((ret = posix_memalign((void **)&mem_ptr, page_size, size))) {
100 log_err("fio: posix_memalign() failed\n");
101 td_verror(td, ret, "posix_memalign");
102 return NULL;
103 }
104
105 mem->mem_ptr = mem_ptr;
106 mem->size_mmap = 0;
107
108 return mem_ptr;
109}
110
111char *librpma_fio_allocate_pmem(struct thread_data *td, const char *filename,
112 size_t size, struct librpma_fio_mem *mem)
113{
114 size_t size_mmap = 0;
115 char *mem_ptr = NULL;
116 int is_pmem = 0;
117 size_t ws_offset;
118
119 if (size % page_size) {
120 log_err("fio: size (%zu) is not aligned to page size (%zu)\n",
121 size, page_size);
122 return NULL;
123 }
124
125 ws_offset = (td->thread_number - 1) * size;
126
127 if (!filename) {
128 log_err("fio: filename is not set\n");
129 return NULL;
130 }
131
132 /* map the file */
133 mem_ptr = pmem_map_file(filename, 0 /* len */, 0 /* flags */,
134 0 /* mode */, &size_mmap, &is_pmem);
135 if (mem_ptr == NULL) {
136 log_err("fio: pmem_map_file(%s) failed\n", filename);
137 /* pmem_map_file() sets errno on failure */
138 td_verror(td, errno, "pmem_map_file");
139 return NULL;
140 }
141
142 /* pmem is expected */
143 if (!is_pmem) {
144 log_err("fio: %s is not located in persistent memory\n",
145 filename);
146 goto err_unmap;
147 }
148
149 /* check size of allocated persistent memory */
150 if (size_mmap < ws_offset + size) {
151 log_err(
152 "fio: %s is too small to handle so many threads (%zu < %zu)\n",
153 filename, size_mmap, ws_offset + size);
154 goto err_unmap;
155 }
156
157 log_info("fio: size of memory mapped from the file %s: %zu\n",
158 filename, size_mmap);
159
160 mem->mem_ptr = mem_ptr;
161 mem->size_mmap = size_mmap;
162
163 return mem_ptr + ws_offset;
164
165err_unmap:
166 (void) pmem_unmap(mem_ptr, size_mmap);
167 return NULL;
168}
169
170void librpma_fio_free(struct librpma_fio_mem *mem)
171{
172 if (mem->size_mmap)
173 (void) pmem_unmap(mem->mem_ptr, mem->size_mmap);
174 else
175 free(mem->mem_ptr);
176}
177
178#define LIBRPMA_FIO_RETRY_MAX_NO 10
179#define LIBRPMA_FIO_RETRY_DELAY_S 5
180
181int librpma_fio_client_init(struct thread_data *td,
182 struct rpma_conn_cfg *cfg)
183{
184 struct librpma_fio_client_data *ccd;
185 struct librpma_fio_options_values *o = td->eo;
186 struct ibv_context *dev = NULL;
187 char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
188 struct rpma_conn_req *req = NULL;
189 enum rpma_conn_event event;
190 struct rpma_conn_private_data pdata;
191 enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
192 int remote_flush_type;
193 int retry;
194 int ret;
195
196 /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
197#ifdef FIO_INC_DEBUG
198 if ((1UL << FD_NET) & fio_debug)
199 log_level_aux = RPMA_LOG_LEVEL_INFO;
200#endif
201
202 /* configure logging thresholds to see more details */
203 rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
204 rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
205
206 /* obtain an IBV context for a remote IP address */
207 if ((ret = rpma_utils_get_ibv_context(o->server_ip,
208 RPMA_UTIL_IBV_CONTEXT_REMOTE, &dev))) {
209 librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
210 return -1;
211 }
212
213 /* allocate client's data */
214 ccd = calloc(1, sizeof(*ccd));
215 if (ccd == NULL) {
216 td_verror(td, errno, "calloc");
217 return -1;
218 }
219
220 /* allocate all in-memory queues */
221 ccd->io_us_queued = calloc(td->o.iodepth, sizeof(*ccd->io_us_queued));
222 if (ccd->io_us_queued == NULL) {
223 td_verror(td, errno, "calloc");
224 goto err_free_ccd;
225 }
226
227 ccd->io_us_flight = calloc(td->o.iodepth, sizeof(*ccd->io_us_flight));
228 if (ccd->io_us_flight == NULL) {
229 td_verror(td, errno, "calloc");
230 goto err_free_io_u_queues;
231 }
232
233 ccd->io_us_completed = calloc(td->o.iodepth,
234 sizeof(*ccd->io_us_completed));
235 if (ccd->io_us_completed == NULL) {
236 td_verror(td, errno, "calloc");
237 goto err_free_io_u_queues;
238 }
239
240 /* create a new peer object */
241 if ((ret = rpma_peer_new(dev, &ccd->peer))) {
242 librpma_td_verror(td, ret, "rpma_peer_new");
243 goto err_free_io_u_queues;
244 }
245
246 /* create a connection request */
247 if (librpma_fio_td_port(o->port, td, port_td))
248 goto err_peer_delete;
249
250 for (retry = 0; retry < LIBRPMA_FIO_RETRY_MAX_NO; retry++) {
251 if ((ret = rpma_conn_req_new(ccd->peer, o->server_ip, port_td,
252 cfg, &req))) {
253 librpma_td_verror(td, ret, "rpma_conn_req_new");
254 goto err_peer_delete;
255 }
256
257 /*
258 * Connect the connection request
259 * and obtain the connection object.
260 */
261 if ((ret = rpma_conn_req_connect(&req, NULL, &ccd->conn))) {
262 librpma_td_verror(td, ret, "rpma_conn_req_connect");
263 goto err_req_delete;
264 }
265
266 /* wait for the connection to establish */
267 if ((ret = rpma_conn_next_event(ccd->conn, &event))) {
268 librpma_td_verror(td, ret, "rpma_conn_next_event");
269 goto err_conn_delete;
270 } else if (event == RPMA_CONN_ESTABLISHED) {
271 break;
272 } else if (event == RPMA_CONN_REJECTED) {
273 (void) rpma_conn_disconnect(ccd->conn);
274 (void) rpma_conn_delete(&ccd->conn);
275 if (retry < LIBRPMA_FIO_RETRY_MAX_NO - 1) {
276 log_err("Thread [%d]: Retrying (#%i) ...\n",
277 td->thread_number, retry + 1);
278 sleep(LIBRPMA_FIO_RETRY_DELAY_S);
279 } else {
280 log_err(
281 "Thread [%d]: The maximum number of retries exceeded. Closing.\n",
282 td->thread_number);
283 }
284 } else {
285 log_err(
286 "rpma_conn_next_event returned an unexptected event: (%s != RPMA_CONN_ESTABLISHED)\n",
287 rpma_utils_conn_event_2str(event));
288 goto err_conn_delete;
289 }
290 }
291
292 if (retry > 0)
293 log_err("Thread [%d]: Connected after retry #%i\n",
294 td->thread_number, retry);
295
296 if (ccd->conn == NULL)
297 goto err_peer_delete;
298
299 /* get the connection's private data sent from the server */
300 if ((ret = rpma_conn_get_private_data(ccd->conn, &pdata))) {
301 librpma_td_verror(td, ret, "rpma_conn_get_private_data");
302 goto err_conn_delete;
303 }
304
305 /* get the server's workspace representation */
306 ccd->ws = pdata.ptr;
307
308 /* create the server's memory representation */
309 if ((ret = rpma_mr_remote_from_descriptor(&ccd->ws->descriptor[0],
310 ccd->ws->mr_desc_size, &ccd->server_mr))) {
311 librpma_td_verror(td, ret, "rpma_mr_remote_from_descriptor");
312 goto err_conn_delete;
313 }
314
315 /* get the total size of the shared server memory */
316 if ((ret = rpma_mr_remote_get_size(ccd->server_mr, &ccd->ws_size))) {
317 librpma_td_verror(td, ret, "rpma_mr_remote_get_size");
318 goto err_conn_delete;
319 }
320
321 /* get flush type of the remote node */
322 if ((ret = rpma_mr_remote_get_flush_type(ccd->server_mr,
323 &remote_flush_type))) {
324 librpma_td_verror(td, ret, "rpma_mr_remote_get_flush_type");
325 goto err_conn_delete;
326 }
327
328 ccd->server_mr_flush_type =
329 (remote_flush_type & RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT) ?
330 RPMA_FLUSH_TYPE_PERSISTENT : RPMA_FLUSH_TYPE_VISIBILITY;
331
332 /*
333 * Assure an io_us buffer allocation is page-size-aligned which is required
334 * to register for RDMA. User-provided value is intentionally ignored.
335 */
336 td->o.mem_align = page_size;
337
338 td->io_ops_data = ccd;
339
340 return 0;
341
342err_conn_delete:
343 (void) rpma_conn_disconnect(ccd->conn);
344 (void) rpma_conn_delete(&ccd->conn);
345
346err_req_delete:
347 (void) rpma_conn_req_delete(&req);
348
349err_peer_delete:
350 (void) rpma_peer_delete(&ccd->peer);
351
352err_free_io_u_queues:
353 free(ccd->io_us_queued);
354 free(ccd->io_us_flight);
355 free(ccd->io_us_completed);
356
357err_free_ccd:
358 free(ccd);
359
360 return -1;
361}
362
363void librpma_fio_client_cleanup(struct thread_data *td)
364{
365 struct librpma_fio_client_data *ccd = td->io_ops_data;
366 enum rpma_conn_event ev;
367 int ret;
368
369 if (ccd == NULL)
370 return;
371
372 /* delete the iou's memory registration */
373 if ((ret = rpma_mr_dereg(&ccd->orig_mr)))
374 librpma_td_verror(td, ret, "rpma_mr_dereg");
375 /* delete the iou's memory registration */
376 if ((ret = rpma_mr_remote_delete(&ccd->server_mr)))
377 librpma_td_verror(td, ret, "rpma_mr_remote_delete");
378 /* initiate disconnection */
379 if ((ret = rpma_conn_disconnect(ccd->conn)))
380 librpma_td_verror(td, ret, "rpma_conn_disconnect");
381 /* wait for disconnection to end up */
382 if ((ret = rpma_conn_next_event(ccd->conn, &ev))) {
383 librpma_td_verror(td, ret, "rpma_conn_next_event");
384 } else if (ev != RPMA_CONN_CLOSED) {
385 log_err(
386 "client_cleanup received an unexpected event (%s != RPMA_CONN_CLOSED)\n",
387 rpma_utils_conn_event_2str(ev));
388 }
389 /* delete the connection */
390 if ((ret = rpma_conn_delete(&ccd->conn)))
391 librpma_td_verror(td, ret, "rpma_conn_delete");
392 /* delete the peer */
393 if ((ret = rpma_peer_delete(&ccd->peer)))
394 librpma_td_verror(td, ret, "rpma_peer_delete");
395 /* free the software queues */
396 free(ccd->io_us_queued);
397 free(ccd->io_us_flight);
398 free(ccd->io_us_completed);
399 free(ccd);
400 td->io_ops_data = NULL; /* zero ccd */
401}
402
403int librpma_fio_file_nop(struct thread_data *td, struct fio_file *f)
404{
405 /* NOP */
406 return 0;
407}
408
409int librpma_fio_client_post_init(struct thread_data *td)
410{
411 struct librpma_fio_client_data *ccd = td->io_ops_data;
412 size_t io_us_size;
413 int ret;
414
415 /*
416 * td->orig_buffer is not aligned. The engine requires aligned io_us
417 * so FIO alignes up the address using the formula below.
418 */
419 ccd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
420 td->o.mem_align;
421
422 /*
423 * td->orig_buffer_size beside the space really consumed by io_us
424 * has paddings which can be omitted for the memory registration.
425 */
426 io_us_size = (unsigned long long)td_max_bs(td) *
427 (unsigned long long)td->o.iodepth;
428
429 if ((ret = rpma_mr_reg(ccd->peer, ccd->orig_buffer_aligned, io_us_size,
430 RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
431 RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
432 RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT, &ccd->orig_mr)))
433 librpma_td_verror(td, ret, "rpma_mr_reg");
434 return ret;
435}
436
437int librpma_fio_client_get_file_size(struct thread_data *td,
438 struct fio_file *f)
439{
440 struct librpma_fio_client_data *ccd = td->io_ops_data;
441
442 f->real_file_size = ccd->ws_size;
443 fio_file_set_size_known(f);
444
445 return 0;
446}
447
448static enum fio_q_status client_queue_sync(struct thread_data *td,
449 struct io_u *io_u)
450{
451 struct librpma_fio_client_data *ccd = td->io_ops_data;
452 struct rpma_completion cmpl;
453 unsigned io_u_index;
454 int ret;
455
456 /* execute io_u */
457 if (io_u->ddir == DDIR_READ) {
458 /* post an RDMA read operation */
459 if (librpma_fio_client_io_read(td, io_u,
460 RPMA_F_COMPLETION_ALWAYS))
461 goto err;
462 } else if (io_u->ddir == DDIR_WRITE) {
463 /* post an RDMA write operation */
464 if (librpma_fio_client_io_write(td, io_u))
465 goto err;
466 if (ccd->flush(td, io_u, io_u, io_u->xfer_buflen))
467 goto err;
468 } else {
469 log_err("unsupported IO mode: %s\n", io_ddir_name(io_u->ddir));
470 goto err;
471 }
472
473 do {
474 /* get a completion */
475 ret = rpma_conn_completion_get(ccd->conn, &cmpl);
476 if (ret == RPMA_E_NO_COMPLETION) {
477 /* lack of completion is not an error */
478 continue;
479 } else if (ret != 0) {
480 /* an error occurred */
481 librpma_td_verror(td, ret, "rpma_conn_completion_get");
482 goto err;
483 }
484
485 /* if io_us has completed with an error */
486 if (cmpl.op_status != IBV_WC_SUCCESS)
487 goto err;
488
489 if (cmpl.op == RPMA_OP_SEND)
490 ++ccd->op_send_completed;
491 else {
492 if (cmpl.op == RPMA_OP_RECV)
493 ++ccd->op_recv_completed;
494
495 break;
496 }
497 } while (1);
498
499 if (ccd->get_io_u_index(&cmpl, &io_u_index) != 1)
500 goto err;
501
502 if (io_u->index != io_u_index) {
503 log_err(
504 "no matching io_u for received completion found (io_u_index=%u)\n",
505 io_u_index);
506 goto err;
507 }
508
509 /* make sure all SENDs are completed before exit - clean up SQ */
510 if (librpma_fio_client_io_complete_all_sends(td))
511 goto err;
512
513 return FIO_Q_COMPLETED;
514
515err:
516 io_u->error = -1;
517 return FIO_Q_COMPLETED;
518}
519
520enum fio_q_status librpma_fio_client_queue(struct thread_data *td,
521 struct io_u *io_u)
522{
523 struct librpma_fio_client_data *ccd = td->io_ops_data;
524
525 if (ccd->io_u_queued_nr == (int)td->o.iodepth)
526 return FIO_Q_BUSY;
527
528 if (td->o.sync_io)
529 return client_queue_sync(td, io_u);
530
531 /* io_u -> queued[] */
532 ccd->io_us_queued[ccd->io_u_queued_nr] = io_u;
533 ccd->io_u_queued_nr++;
534
535 return FIO_Q_QUEUED;
536}
537
538int librpma_fio_client_commit(struct thread_data *td)
539{
540 struct librpma_fio_client_data *ccd = td->io_ops_data;
541 int flags = RPMA_F_COMPLETION_ON_ERROR;
542 struct timespec now;
543 bool fill_time;
544 int i;
545 struct io_u *flush_first_io_u = NULL;
546 unsigned long long int flush_len = 0;
547
548 if (!ccd->io_us_queued)
549 return -1;
550
551 /* execute all io_us from queued[] */
552 for (i = 0; i < ccd->io_u_queued_nr; i++) {
553 struct io_u *io_u = ccd->io_us_queued[i];
554
555 if (io_u->ddir == DDIR_READ) {
556 if (i + 1 == ccd->io_u_queued_nr ||
557 ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE)
558 flags = RPMA_F_COMPLETION_ALWAYS;
559 /* post an RDMA read operation */
560 if (librpma_fio_client_io_read(td, io_u, flags))
561 return -1;
562 } else if (io_u->ddir == DDIR_WRITE) {
563 /* post an RDMA write operation */
564 if (librpma_fio_client_io_write(td, io_u))
565 return -1;
566
567 /* cache the first io_u in the sequence */
568 if (flush_first_io_u == NULL)
569 flush_first_io_u = io_u;
570
571 /*
572 * the flush length is the sum of all io_u's creating
573 * the sequence
574 */
575 flush_len += io_u->xfer_buflen;
576
577 /*
578 * if io_u's are random the rpma_flush is required
579 * after each one of them
580 */
581 if (!td_random(td)) {
582 /*
583 * When the io_u's are sequential and
584 * the current io_u is not the last one and
585 * the next one is also a write operation
586 * the flush can be postponed by one io_u and
587 * cover all of them which build a continuous
588 * sequence.
589 */
590 if ((i + 1 < ccd->io_u_queued_nr) &&
591 (ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE))
592 continue;
593 }
594
595 /* flush all writes which build a continuous sequence */
596 if (ccd->flush(td, flush_first_io_u, io_u, flush_len))
597 return -1;
598
599 /*
600 * reset the flush parameters in preparation for
601 * the next one
602 */
603 flush_first_io_u = NULL;
604 flush_len = 0;
605 } else {
606 log_err("unsupported IO mode: %s\n",
607 io_ddir_name(io_u->ddir));
608 return -1;
609 }
610 }
611
612 if ((fill_time = fio_fill_issue_time(td)))
613 fio_gettime(&now, NULL);
614
615 /* move executed io_us from queued[] to flight[] */
616 for (i = 0; i < ccd->io_u_queued_nr; i++) {
617 struct io_u *io_u = ccd->io_us_queued[i];
618
619 /* FIO does not do this if the engine is asynchronous */
620 if (fill_time)
621 memcpy(&io_u->issue_time, &now, sizeof(now));
622
623 /* move executed io_us from queued[] to flight[] */
624 ccd->io_us_flight[ccd->io_u_flight_nr] = io_u;
625 ccd->io_u_flight_nr++;
626
627 /*
628 * FIO says:
629 * If an engine has the commit hook
630 * it has to call io_u_queued() itself.
631 */
632 io_u_queued(td, io_u);
633 }
634
635 /* FIO does not do this if an engine has the commit hook. */
636 io_u_mark_submit(td, ccd->io_u_queued_nr);
637 ccd->io_u_queued_nr = 0;
638
639 return 0;
640}
641
642/*
643 * RETURN VALUE
644 * - > 0 - a number of completed io_us
645 * - 0 - when no complicitions received
646 * - (-1) - when an error occurred
647 */
648static int client_getevent_process(struct thread_data *td)
649{
650 struct librpma_fio_client_data *ccd = td->io_ops_data;
651 struct rpma_completion cmpl;
652 /* io_u->index of completed io_u (cmpl.op_context) */
653 unsigned int io_u_index;
654 /* # of completed io_us */
655 int cmpl_num = 0;
656 /* helpers */
657 struct io_u *io_u;
658 int i;
659 int ret;
660
661 /* get a completion */
662 if ((ret = rpma_conn_completion_get(ccd->conn, &cmpl))) {
663 /* lack of completion is not an error */
664 if (ret == RPMA_E_NO_COMPLETION) {
665 /* lack of completion is not an error */
666 return 0;
667 }
668
669 /* an error occurred */
670 librpma_td_verror(td, ret, "rpma_conn_completion_get");
671 return -1;
672 }
673
674 /* if io_us has completed with an error */
675 if (cmpl.op_status != IBV_WC_SUCCESS) {
676 td->error = cmpl.op_status;
677 return -1;
678 }
679
680 if (cmpl.op == RPMA_OP_SEND)
681 ++ccd->op_send_completed;
682 else if (cmpl.op == RPMA_OP_RECV)
683 ++ccd->op_recv_completed;
684
685 if ((ret = ccd->get_io_u_index(&cmpl, &io_u_index)) != 1)
686 return ret;
687
688 /* look for an io_u being completed */
689 for (i = 0; i < ccd->io_u_flight_nr; ++i) {
690 if (ccd->io_us_flight[i]->index == io_u_index) {
691 cmpl_num = i + 1;
692 break;
693 }
694 }
695
696 /* if no matching io_u has been found */
697 if (cmpl_num == 0) {
698 log_err(
699 "no matching io_u for received completion found (io_u_index=%u)\n",
700 io_u_index);
701 return -1;
702 }
703
704 /* move completed io_us to the completed in-memory queue */
705 for (i = 0; i < cmpl_num; ++i) {
706 /* get and prepare io_u */
707 io_u = ccd->io_us_flight[i];
708
709 /* append to the queue */
710 ccd->io_us_completed[ccd->io_u_completed_nr] = io_u;
711 ccd->io_u_completed_nr++;
712 }
713
714 /* remove completed io_us from the flight queue */
715 for (i = cmpl_num; i < ccd->io_u_flight_nr; ++i)
716 ccd->io_us_flight[i - cmpl_num] = ccd->io_us_flight[i];
717 ccd->io_u_flight_nr -= cmpl_num;
718
719 return cmpl_num;
720}
721
722int librpma_fio_client_getevents(struct thread_data *td, unsigned int min,
723 unsigned int max, const struct timespec *t)
724{
725 struct librpma_fio_client_data *ccd = td->io_ops_data;
726 /* total # of completed io_us */
727 int cmpl_num_total = 0;
728 /* # of completed io_us from a single event */
729 int cmpl_num;
730
731 do {
732 cmpl_num = client_getevent_process(td);
733 if (cmpl_num > 0) {
734 /* new completions collected */
735 cmpl_num_total += cmpl_num;
736 } else if (cmpl_num == 0) {
737 /*
738 * It is required to make sure that CQEs for SENDs
739 * will flow at least at the same pace as CQEs for RECVs.
740 */
741 if (cmpl_num_total >= min &&
742 ccd->op_send_completed >= ccd->op_recv_completed)
743 break;
744
745 /*
746 * To reduce CPU consumption one can use
747 * the rpma_conn_completion_wait() function.
748 * Note this greatly increase the latency
749 * and make the results less stable.
750 * The bandwidth stays more or less the same.
751 */
752 } else {
753 /* an error occurred */
754 return -1;
755 }
756
757 /*
758 * The expected max can be exceeded if CQEs for RECVs will come up
759 * faster than CQEs for SENDs. But it is required to make sure CQEs for
760 * SENDs will flow at least at the same pace as CQEs for RECVs.
761 */
762 } while (cmpl_num_total < max ||
763 ccd->op_send_completed < ccd->op_recv_completed);
764
765 /*
766 * All posted SENDs are completed and RECVs for them (responses) are
767 * completed. This is the initial situation so the counters are reset.
768 */
769 if (ccd->op_send_posted == ccd->op_send_completed &&
770 ccd->op_send_completed == ccd->op_recv_completed) {
771 ccd->op_send_posted = 0;
772 ccd->op_send_completed = 0;
773 ccd->op_recv_completed = 0;
774 }
775
776 return cmpl_num_total;
777}
778
779struct io_u *librpma_fio_client_event(struct thread_data *td, int event)
780{
781 struct librpma_fio_client_data *ccd = td->io_ops_data;
782 struct io_u *io_u;
783 int i;
784
785 /* get the first io_u from the queue */
786 io_u = ccd->io_us_completed[0];
787
788 /* remove the first io_u from the queue */
789 for (i = 1; i < ccd->io_u_completed_nr; ++i)
790 ccd->io_us_completed[i - 1] = ccd->io_us_completed[i];
791 ccd->io_u_completed_nr--;
792
793 dprint_io_u(io_u, "client_event");
794
795 return io_u;
796}
797
798char *librpma_fio_client_errdetails(struct io_u *io_u)
799{
800 /* get the string representation of an error */
801 enum ibv_wc_status status = io_u->error;
802 const char *status_str = ibv_wc_status_str(status);
803
804 char *details = strdup(status_str);
805 if (details == NULL) {
806 fprintf(stderr, "Error: %s\n", status_str);
807 fprintf(stderr, "Fatal error: out of memory. Aborting.\n");
808 abort();
809 }
810
811 /* FIO frees the returned string when it becomes obsolete */
812 return details;
813}
814
815int librpma_fio_server_init(struct thread_data *td)
816{
817 struct librpma_fio_options_values *o = td->eo;
818 struct librpma_fio_server_data *csd;
819 struct ibv_context *dev = NULL;
820 enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
821 int ret = -1;
822
823 /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
824#ifdef FIO_INC_DEBUG
825 if ((1UL << FD_NET) & fio_debug)
826 log_level_aux = RPMA_LOG_LEVEL_INFO;
827#endif
828
829 /* configure logging thresholds to see more details */
830 rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
831 rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
832
833
834 /* obtain an IBV context for a remote IP address */
835 if ((ret = rpma_utils_get_ibv_context(o->server_ip,
836 RPMA_UTIL_IBV_CONTEXT_LOCAL, &dev))) {
837 librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
838 return -1;
839 }
840
841 /* allocate server's data */
842 csd = calloc(1, sizeof(*csd));
843 if (csd == NULL) {
844 td_verror(td, errno, "calloc");
845 return -1;
846 }
847
848 /* create a new peer object */
849 if ((ret = rpma_peer_new(dev, &csd->peer))) {
850 librpma_td_verror(td, ret, "rpma_peer_new");
851 goto err_free_csd;
852 }
853
854 td->io_ops_data = csd;
855
856 return 0;
857
858err_free_csd:
859 free(csd);
860
861 return -1;
862}
863
864void librpma_fio_server_cleanup(struct thread_data *td)
865{
866 struct librpma_fio_server_data *csd = td->io_ops_data;
867 int ret;
868
869 if (csd == NULL)
870 return;
871
872 /* free the peer */
873 if ((ret = rpma_peer_delete(&csd->peer)))
874 librpma_td_verror(td, ret, "rpma_peer_delete");
875
876 free(csd);
877}
878
879int librpma_fio_server_open_file(struct thread_data *td, struct fio_file *f,
880 struct rpma_conn_cfg *cfg)
881{
882 struct librpma_fio_server_data *csd = td->io_ops_data;
883 struct librpma_fio_options_values *o = td->eo;
884 enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
885 struct librpma_fio_workspace ws = {0};
886 struct rpma_conn_private_data pdata;
887 uint32_t max_msg_num;
888 struct rpma_conn_req *conn_req;
889 struct rpma_conn *conn;
890 struct rpma_mr_local *mr;
891 char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
892 struct rpma_ep *ep;
893 size_t mem_size = td->o.size;
894 size_t mr_desc_size;
895 void *ws_ptr;
896 int usage_mem_type;
897 int ret;
898
899 if (!f->file_name) {
900 log_err("fio: filename is not set\n");
901 return -1;
902 }
903
904 /* start a listening endpoint at addr:port */
905 if (librpma_fio_td_port(o->port, td, port_td))
906 return -1;
907
908 if ((ret = rpma_ep_listen(csd->peer, o->server_ip, port_td, &ep))) {
909 librpma_td_verror(td, ret, "rpma_ep_listen");
910 return -1;
911 }
912
913 if (strcmp(f->file_name, "malloc") == 0) {
914 /* allocation from DRAM using posix_memalign() */
915 ws_ptr = librpma_fio_allocate_dram(td, mem_size, &csd->mem);
916 usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_VISIBILITY;
917 } else {
918 /* allocation from PMEM using pmem_map_file() */
919 ws_ptr = librpma_fio_allocate_pmem(td, f->file_name,
920 mem_size, &csd->mem);
921 usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT;
922 }
923
924 if (ws_ptr == NULL)
925 goto err_ep_shutdown;
926
927 f->real_file_size = mem_size;
928
929 if ((ret = rpma_mr_reg(csd->peer, ws_ptr, mem_size,
930 RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
931 RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
932 usage_mem_type, &mr))) {
933 librpma_td_verror(td, ret, "rpma_mr_reg");
934 goto err_free;
935 }
936
937 /* get size of the memory region's descriptor */
938 if ((ret = rpma_mr_get_descriptor_size(mr, &mr_desc_size))) {
939 librpma_td_verror(td, ret, "rpma_mr_get_descriptor_size");
940 goto err_mr_dereg;
941 }
942
943 /* verify size of the memory region's descriptor */
944 if (mr_desc_size > LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE) {
945 log_err(
946 "size of the memory region's descriptor is too big (max=%i)\n",
947 LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE);
948 goto err_mr_dereg;
949 }
950
951 /* get the memory region's descriptor */
952 if ((ret = rpma_mr_get_descriptor(mr, &ws.descriptor[0]))) {
953 librpma_td_verror(td, ret, "rpma_mr_get_descriptor");
954 goto err_mr_dereg;
955 }
956
957 if (cfg != NULL) {
958 if ((ret = rpma_conn_cfg_get_rq_size(cfg, &max_msg_num))) {
959 librpma_td_verror(td, ret, "rpma_conn_cfg_get_rq_size");
960 goto err_mr_dereg;
961 }
962
963 /* verify whether iodepth fits into uint16_t */
964 if (max_msg_num > UINT16_MAX) {
965 log_err("fio: iodepth too big (%u > %u)\n",
966 max_msg_num, UINT16_MAX);
967 return -1;
968 }
969
970 ws.max_msg_num = max_msg_num;
971 }
972
973 /* prepare a workspace description */
974 ws.direct_write_to_pmem = o->direct_write_to_pmem;
975 ws.mr_desc_size = mr_desc_size;
976 pdata.ptr = &ws;
977 pdata.len = sizeof(ws);
978
979 /* receive an incoming connection request */
980 if ((ret = rpma_ep_next_conn_req(ep, cfg, &conn_req))) {
981 librpma_td_verror(td, ret, "rpma_ep_next_conn_req");
982 goto err_mr_dereg;
983 }
984
985 if (csd->prepare_connection && csd->prepare_connection(td, conn_req))
986 goto err_req_delete;
987
988 /* accept the connection request and obtain the connection object */
989 if ((ret = rpma_conn_req_connect(&conn_req, &pdata, &conn))) {
990 librpma_td_verror(td, ret, "rpma_conn_req_connect");
991 goto err_req_delete;
992 }
993
994 /* wait for the connection to be established */
995 if ((ret = rpma_conn_next_event(conn, &conn_event))) {
996 librpma_td_verror(td, ret, "rpma_conn_next_event");
997 goto err_conn_delete;
998 } else if (conn_event != RPMA_CONN_ESTABLISHED) {
999 log_err("rpma_conn_next_event returned an unexptected event\n");
1000 goto err_conn_delete;
1001 }
1002
1003 /* end-point is no longer needed */
1004 (void) rpma_ep_shutdown(&ep);
1005
1006 csd->ws_mr = mr;
1007 csd->ws_ptr = ws_ptr;
1008 csd->conn = conn;
1009
1010 return 0;
1011
1012err_conn_delete:
1013 (void) rpma_conn_delete(&conn);
1014
1015err_req_delete:
1016 (void) rpma_conn_req_delete(&conn_req);
1017
1018err_mr_dereg:
1019 (void) rpma_mr_dereg(&mr);
1020
1021err_free:
1022 librpma_fio_free(&csd->mem);
1023
1024err_ep_shutdown:
1025 (void) rpma_ep_shutdown(&ep);
1026
1027 return -1;
1028}
1029
1030int librpma_fio_server_close_file(struct thread_data *td, struct fio_file *f)
1031{
1032 struct librpma_fio_server_data *csd = td->io_ops_data;
1033 enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
1034 int rv = 0;
1035 int ret;
1036
1037 /* wait for the connection to be closed */
1038 ret = rpma_conn_next_event(csd->conn, &conn_event);
1039 if (!ret && conn_event != RPMA_CONN_CLOSED) {
1040 log_err("rpma_conn_next_event returned an unexptected event\n");
1041 rv = -1;
1042 }
1043
1044 if ((ret = rpma_conn_disconnect(csd->conn))) {
1045 librpma_td_verror(td, ret, "rpma_conn_disconnect");
1046 rv = -1;
1047 }
1048
1049 if ((ret = rpma_conn_delete(&csd->conn))) {
1050 librpma_td_verror(td, ret, "rpma_conn_delete");
1051 rv = -1;
1052 }
1053
1054 if ((ret = rpma_mr_dereg(&csd->ws_mr))) {
1055 librpma_td_verror(td, ret, "rpma_mr_dereg");
1056 rv = -1;
1057 }
1058
1059 librpma_fio_free(&csd->mem);
1060
1061 return rv;
1062}