Merge branch 'wip-rados-dont-zerowrite' of https://github.com/aclamk/fio
[fio.git] / engines / librpma_fio.c
CommitLineData
e4c4625f
JM
1/*
2 * librpma_fio: librpma_apm and librpma_gpspm engines' common part.
3 *
4 * Copyright 2021, Intel Corporation
5 *
6 * This program is free software; you can redistribute it and/or
7 * modify it under the terms of the GNU General Public License,
8 * version 2 as published by the Free Software Foundation..
9 *
10 * This program is distributed in the hope that it will be useful,
11 * but WITHOUT ANY WARRANTY; without even the implied warranty of
12 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13 * GNU General Public License for more details.
14 */
15
16#include "librpma_fio.h"
17
18#include <libpmem.h>
19
20struct fio_option librpma_fio_options[] = {
21 {
22 .name = "serverip",
23 .lname = "rpma_server_ip",
24 .type = FIO_OPT_STR_STORE,
25 .off1 = offsetof(struct librpma_fio_options_values, server_ip),
26 .help = "IP address the server is listening on",
27 .def = "",
28 .category = FIO_OPT_C_ENGINE,
29 .group = FIO_OPT_G_LIBRPMA,
30 },
31 {
32 .name = "port",
33 .lname = "rpma_server port",
34 .type = FIO_OPT_STR_STORE,
35 .off1 = offsetof(struct librpma_fio_options_values, port),
36 .help = "port the server is listening on",
37 .def = "7204",
38 .category = FIO_OPT_C_ENGINE,
39 .group = FIO_OPT_G_LIBRPMA,
40 },
41 {
42 .name = "direct_write_to_pmem",
43 .lname = "Direct Write to PMem (via RDMA) from the remote host is possible",
44 .type = FIO_OPT_BOOL,
45 .off1 = offsetof(struct librpma_fio_options_values,
46 direct_write_to_pmem),
47 .help = "Set to true ONLY when Direct Write to PMem from the remote host is possible (https://pmem.io/rpma/documentation/basic-direct-write-to-pmem.html)",
48 .def = "",
49 .category = FIO_OPT_C_ENGINE,
50 .group = FIO_OPT_G_LIBRPMA,
51 },
52 {
53 .name = NULL,
54 },
55};
56
57int librpma_fio_td_port(const char *port_base_str, struct thread_data *td,
58 char *port_out)
59{
60 unsigned long int port_ul = strtoul(port_base_str, NULL, 10);
61 unsigned int port_new;
62
63 port_out[0] = '\0';
64
65 if (port_ul == ULONG_MAX) {
66 td_verror(td, errno, "strtoul");
67 return -1;
68 }
69 port_ul += td->thread_number - 1;
70 if (port_ul >= UINT_MAX) {
71 log_err("[%u] port number (%lu) bigger than UINT_MAX\n",
72 td->thread_number, port_ul);
73 return -1;
74 }
75
76 port_new = port_ul;
77 snprintf(port_out, LIBRPMA_FIO_PORT_STR_LEN_MAX - 1, "%u", port_new);
78
79 return 0;
80}
81
82char *librpma_fio_allocate_dram(struct thread_data *td, size_t size,
83 struct librpma_fio_mem *mem)
84{
85 char *mem_ptr = NULL;
86 int ret;
87
88 if ((ret = posix_memalign((void **)&mem_ptr, page_size, size))) {
89 log_err("fio: posix_memalign() failed\n");
90 td_verror(td, ret, "posix_memalign");
91 return NULL;
92 }
93
94 mem->mem_ptr = mem_ptr;
95 mem->size_mmap = 0;
96
97 return mem_ptr;
98}
99
100char *librpma_fio_allocate_pmem(struct thread_data *td, const char *filename,
101 size_t size, struct librpma_fio_mem *mem)
102{
103 size_t size_mmap = 0;
104 char *mem_ptr = NULL;
105 int is_pmem = 0;
106 size_t ws_offset;
107
108 if (size % page_size) {
109 log_err("fio: size (%zu) is not aligned to page size (%zu)\n",
110 size, page_size);
111 return NULL;
112 }
113
114 ws_offset = (td->thread_number - 1) * size;
115
116 if (!filename) {
117 log_err("fio: filename is not set\n");
118 return NULL;
119 }
120
121 /* map the file */
122 mem_ptr = pmem_map_file(filename, 0 /* len */, 0 /* flags */,
123 0 /* mode */, &size_mmap, &is_pmem);
124 if (mem_ptr == NULL) {
125 log_err("fio: pmem_map_file(%s) failed\n", filename);
126 /* pmem_map_file() sets errno on failure */
127 td_verror(td, errno, "pmem_map_file");
128 return NULL;
129 }
130
131 /* pmem is expected */
132 if (!is_pmem) {
133 log_err("fio: %s is not located in persistent memory\n",
134 filename);
135 goto err_unmap;
136 }
137
138 /* check size of allocated persistent memory */
139 if (size_mmap < ws_offset + size) {
140 log_err(
141 "fio: %s is too small to handle so many threads (%zu < %zu)\n",
142 filename, size_mmap, ws_offset + size);
143 goto err_unmap;
144 }
145
146 log_info("fio: size of memory mapped from the file %s: %zu\n",
147 filename, size_mmap);
148
149 mem->mem_ptr = mem_ptr;
150 mem->size_mmap = size_mmap;
151
152 return mem_ptr + ws_offset;
153
154err_unmap:
155 (void) pmem_unmap(mem_ptr, size_mmap);
156 return NULL;
157}
158
159void librpma_fio_free(struct librpma_fio_mem *mem)
160{
161 if (mem->size_mmap)
162 (void) pmem_unmap(mem->mem_ptr, mem->size_mmap);
163 else
164 free(mem->mem_ptr);
165}
166
167#define LIBRPMA_FIO_RETRY_MAX_NO 10
168#define LIBRPMA_FIO_RETRY_DELAY_S 5
169
170int librpma_fio_client_init(struct thread_data *td,
171 struct rpma_conn_cfg *cfg)
172{
173 struct librpma_fio_client_data *ccd;
174 struct librpma_fio_options_values *o = td->eo;
175 struct ibv_context *dev = NULL;
176 char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
177 struct rpma_conn_req *req = NULL;
178 enum rpma_conn_event event;
179 struct rpma_conn_private_data pdata;
180 enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
181 int remote_flush_type;
182 int retry;
183 int ret;
184
185 /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
186#ifdef FIO_INC_DEBUG
187 if ((1UL << FD_NET) & fio_debug)
188 log_level_aux = RPMA_LOG_LEVEL_INFO;
189#endif
190
191 /* configure logging thresholds to see more details */
192 rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
193 rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
194
195 /* obtain an IBV context for a remote IP address */
196 if ((ret = rpma_utils_get_ibv_context(o->server_ip,
197 RPMA_UTIL_IBV_CONTEXT_REMOTE, &dev))) {
198 librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
199 return -1;
200 }
201
202 /* allocate client's data */
203 ccd = calloc(1, sizeof(*ccd));
204 if (ccd == NULL) {
205 td_verror(td, errno, "calloc");
206 return -1;
207 }
208
209 /* allocate all in-memory queues */
210 ccd->io_us_queued = calloc(td->o.iodepth, sizeof(*ccd->io_us_queued));
211 if (ccd->io_us_queued == NULL) {
212 td_verror(td, errno, "calloc");
213 goto err_free_ccd;
214 }
215
216 ccd->io_us_flight = calloc(td->o.iodepth, sizeof(*ccd->io_us_flight));
217 if (ccd->io_us_flight == NULL) {
218 td_verror(td, errno, "calloc");
219 goto err_free_io_u_queues;
220 }
221
222 ccd->io_us_completed = calloc(td->o.iodepth,
223 sizeof(*ccd->io_us_completed));
224 if (ccd->io_us_completed == NULL) {
225 td_verror(td, errno, "calloc");
226 goto err_free_io_u_queues;
227 }
228
229 /* create a new peer object */
230 if ((ret = rpma_peer_new(dev, &ccd->peer))) {
231 librpma_td_verror(td, ret, "rpma_peer_new");
232 goto err_free_io_u_queues;
233 }
234
235 /* create a connection request */
236 if (librpma_fio_td_port(o->port, td, port_td))
237 goto err_peer_delete;
238
239 for (retry = 0; retry < LIBRPMA_FIO_RETRY_MAX_NO; retry++) {
240 if ((ret = rpma_conn_req_new(ccd->peer, o->server_ip, port_td,
241 cfg, &req))) {
242 librpma_td_verror(td, ret, "rpma_conn_req_new");
243 goto err_peer_delete;
244 }
245
246 /*
247 * Connect the connection request
248 * and obtain the connection object.
249 */
250 if ((ret = rpma_conn_req_connect(&req, NULL, &ccd->conn))) {
251 librpma_td_verror(td, ret, "rpma_conn_req_connect");
252 goto err_req_delete;
253 }
254
255 /* wait for the connection to establish */
256 if ((ret = rpma_conn_next_event(ccd->conn, &event))) {
257 librpma_td_verror(td, ret, "rpma_conn_next_event");
258 goto err_conn_delete;
259 } else if (event == RPMA_CONN_ESTABLISHED) {
260 break;
261 } else if (event == RPMA_CONN_REJECTED) {
262 (void) rpma_conn_disconnect(ccd->conn);
263 (void) rpma_conn_delete(&ccd->conn);
264 if (retry < LIBRPMA_FIO_RETRY_MAX_NO - 1) {
265 log_err("Thread [%d]: Retrying (#%i) ...\n",
266 td->thread_number, retry + 1);
267 sleep(LIBRPMA_FIO_RETRY_DELAY_S);
268 } else {
269 log_err(
270 "Thread [%d]: The maximum number of retries exceeded. Closing.\n",
271 td->thread_number);
272 }
273 } else {
274 log_err(
275 "rpma_conn_next_event returned an unexptected event: (%s != RPMA_CONN_ESTABLISHED)\n",
276 rpma_utils_conn_event_2str(event));
277 goto err_conn_delete;
278 }
279 }
280
281 if (retry > 0)
282 log_err("Thread [%d]: Connected after retry #%i\n",
283 td->thread_number, retry);
284
285 if (ccd->conn == NULL)
286 goto err_peer_delete;
287
288 /* get the connection's private data sent from the server */
289 if ((ret = rpma_conn_get_private_data(ccd->conn, &pdata))) {
290 librpma_td_verror(td, ret, "rpma_conn_get_private_data");
291 goto err_conn_delete;
292 }
293
294 /* get the server's workspace representation */
295 ccd->ws = pdata.ptr;
296
297 /* create the server's memory representation */
298 if ((ret = rpma_mr_remote_from_descriptor(&ccd->ws->descriptor[0],
299 ccd->ws->mr_desc_size, &ccd->server_mr))) {
300 librpma_td_verror(td, ret, "rpma_mr_remote_from_descriptor");
301 goto err_conn_delete;
302 }
303
304 /* get the total size of the shared server memory */
305 if ((ret = rpma_mr_remote_get_size(ccd->server_mr, &ccd->ws_size))) {
306 librpma_td_verror(td, ret, "rpma_mr_remote_get_size");
307 goto err_conn_delete;
308 }
309
310 /* get flush type of the remote node */
311 if ((ret = rpma_mr_remote_get_flush_type(ccd->server_mr,
312 &remote_flush_type))) {
313 librpma_td_verror(td, ret, "rpma_mr_remote_get_flush_type");
314 goto err_conn_delete;
315 }
316
317 ccd->server_mr_flush_type =
318 (remote_flush_type & RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT) ?
319 RPMA_FLUSH_TYPE_PERSISTENT : RPMA_FLUSH_TYPE_VISIBILITY;
320
321 /*
322 * Assure an io_us buffer allocation is page-size-aligned which is required
323 * to register for RDMA. User-provided value is intentionally ignored.
324 */
325 td->o.mem_align = page_size;
326
327 td->io_ops_data = ccd;
328
329 return 0;
330
331err_conn_delete:
332 (void) rpma_conn_disconnect(ccd->conn);
333 (void) rpma_conn_delete(&ccd->conn);
334
335err_req_delete:
336 (void) rpma_conn_req_delete(&req);
337
338err_peer_delete:
339 (void) rpma_peer_delete(&ccd->peer);
340
341err_free_io_u_queues:
342 free(ccd->io_us_queued);
343 free(ccd->io_us_flight);
344 free(ccd->io_us_completed);
345
346err_free_ccd:
347 free(ccd);
348
349 return -1;
350}
351
352void librpma_fio_client_cleanup(struct thread_data *td)
353{
354 struct librpma_fio_client_data *ccd = td->io_ops_data;
355 enum rpma_conn_event ev;
356 int ret;
357
358 if (ccd == NULL)
359 return;
360
361 /* delete the iou's memory registration */
362 if ((ret = rpma_mr_dereg(&ccd->orig_mr)))
363 librpma_td_verror(td, ret, "rpma_mr_dereg");
364 /* delete the iou's memory registration */
365 if ((ret = rpma_mr_remote_delete(&ccd->server_mr)))
366 librpma_td_verror(td, ret, "rpma_mr_remote_delete");
367 /* initiate disconnection */
368 if ((ret = rpma_conn_disconnect(ccd->conn)))
369 librpma_td_verror(td, ret, "rpma_conn_disconnect");
370 /* wait for disconnection to end up */
371 if ((ret = rpma_conn_next_event(ccd->conn, &ev))) {
372 librpma_td_verror(td, ret, "rpma_conn_next_event");
373 } else if (ev != RPMA_CONN_CLOSED) {
374 log_err(
375 "client_cleanup received an unexpected event (%s != RPMA_CONN_CLOSED)\n",
376 rpma_utils_conn_event_2str(ev));
377 }
378 /* delete the connection */
379 if ((ret = rpma_conn_delete(&ccd->conn)))
380 librpma_td_verror(td, ret, "rpma_conn_delete");
381 /* delete the peer */
382 if ((ret = rpma_peer_delete(&ccd->peer)))
383 librpma_td_verror(td, ret, "rpma_peer_delete");
384 /* free the software queues */
385 free(ccd->io_us_queued);
386 free(ccd->io_us_flight);
387 free(ccd->io_us_completed);
388 free(ccd);
389 td->io_ops_data = NULL; /* zero ccd */
390}
391
392int librpma_fio_file_nop(struct thread_data *td, struct fio_file *f)
393{
394 /* NOP */
395 return 0;
396}
397
398int librpma_fio_client_post_init(struct thread_data *td)
399{
400 struct librpma_fio_client_data *ccd = td->io_ops_data;
401 size_t io_us_size;
402 int ret;
403
404 /*
405 * td->orig_buffer is not aligned. The engine requires aligned io_us
406 * so FIO alignes up the address using the formula below.
407 */
408 ccd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
409 td->o.mem_align;
410
411 /*
412 * td->orig_buffer_size beside the space really consumed by io_us
413 * has paddings which can be omitted for the memory registration.
414 */
415 io_us_size = (unsigned long long)td_max_bs(td) *
416 (unsigned long long)td->o.iodepth;
417
418 if ((ret = rpma_mr_reg(ccd->peer, ccd->orig_buffer_aligned, io_us_size,
419 RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
420 RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
421 RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT, &ccd->orig_mr)))
422 librpma_td_verror(td, ret, "rpma_mr_reg");
423 return ret;
424}
425
426int librpma_fio_client_get_file_size(struct thread_data *td,
427 struct fio_file *f)
428{
429 struct librpma_fio_client_data *ccd = td->io_ops_data;
430
431 f->real_file_size = ccd->ws_size;
432 fio_file_set_size_known(f);
433
434 return 0;
435}
436
437static enum fio_q_status client_queue_sync(struct thread_data *td,
438 struct io_u *io_u)
439{
440 struct librpma_fio_client_data *ccd = td->io_ops_data;
441 struct rpma_completion cmpl;
442 unsigned io_u_index;
443 int ret;
444
445 /* execute io_u */
446 if (io_u->ddir == DDIR_READ) {
447 /* post an RDMA read operation */
448 if (librpma_fio_client_io_read(td, io_u,
449 RPMA_F_COMPLETION_ALWAYS))
450 goto err;
451 } else if (io_u->ddir == DDIR_WRITE) {
452 /* post an RDMA write operation */
453 if (librpma_fio_client_io_write(td, io_u))
454 goto err;
455 if (ccd->flush(td, io_u, io_u, io_u->xfer_buflen))
456 goto err;
457 } else {
458 log_err("unsupported IO mode: %s\n", io_ddir_name(io_u->ddir));
459 goto err;
460 }
461
462 do {
463 /* get a completion */
464 ret = rpma_conn_completion_get(ccd->conn, &cmpl);
465 if (ret == RPMA_E_NO_COMPLETION) {
466 /* lack of completion is not an error */
467 continue;
468 } else if (ret != 0) {
469 /* an error occurred */
470 librpma_td_verror(td, ret, "rpma_conn_completion_get");
471 goto err;
472 }
473
474 /* if io_us has completed with an error */
475 if (cmpl.op_status != IBV_WC_SUCCESS)
476 goto err;
477
478 if (cmpl.op == RPMA_OP_SEND)
479 ++ccd->op_send_completed;
480 else {
481 if (cmpl.op == RPMA_OP_RECV)
482 ++ccd->op_recv_completed;
483
484 break;
485 }
486 } while (1);
487
488 if (ccd->get_io_u_index(&cmpl, &io_u_index) != 1)
489 goto err;
490
491 if (io_u->index != io_u_index) {
492 log_err(
493 "no matching io_u for received completion found (io_u_index=%u)\n",
494 io_u_index);
495 goto err;
496 }
497
498 /* make sure all SENDs are completed before exit - clean up SQ */
499 if (librpma_fio_client_io_complete_all_sends(td))
500 goto err;
501
502 return FIO_Q_COMPLETED;
503
504err:
505 io_u->error = -1;
506 return FIO_Q_COMPLETED;
507}
508
509enum fio_q_status librpma_fio_client_queue(struct thread_data *td,
510 struct io_u *io_u)
511{
512 struct librpma_fio_client_data *ccd = td->io_ops_data;
513
514 if (ccd->io_u_queued_nr == (int)td->o.iodepth)
515 return FIO_Q_BUSY;
516
517 if (td->o.sync_io)
518 return client_queue_sync(td, io_u);
519
520 /* io_u -> queued[] */
521 ccd->io_us_queued[ccd->io_u_queued_nr] = io_u;
522 ccd->io_u_queued_nr++;
523
524 return FIO_Q_QUEUED;
525}
526
527int librpma_fio_client_commit(struct thread_data *td)
528{
529 struct librpma_fio_client_data *ccd = td->io_ops_data;
530 int flags = RPMA_F_COMPLETION_ON_ERROR;
531 struct timespec now;
532 bool fill_time;
533 int i;
534 struct io_u *flush_first_io_u = NULL;
535 unsigned long long int flush_len = 0;
536
537 if (!ccd->io_us_queued)
538 return -1;
539
540 /* execute all io_us from queued[] */
541 for (i = 0; i < ccd->io_u_queued_nr; i++) {
542 struct io_u *io_u = ccd->io_us_queued[i];
543
544 if (io_u->ddir == DDIR_READ) {
545 if (i + 1 == ccd->io_u_queued_nr ||
546 ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE)
547 flags = RPMA_F_COMPLETION_ALWAYS;
548 /* post an RDMA read operation */
549 if (librpma_fio_client_io_read(td, io_u, flags))
550 return -1;
551 } else if (io_u->ddir == DDIR_WRITE) {
552 /* post an RDMA write operation */
553 if (librpma_fio_client_io_write(td, io_u))
554 return -1;
555
556 /* cache the first io_u in the sequence */
557 if (flush_first_io_u == NULL)
558 flush_first_io_u = io_u;
559
560 /*
561 * the flush length is the sum of all io_u's creating
562 * the sequence
563 */
564 flush_len += io_u->xfer_buflen;
565
566 /*
567 * if io_u's are random the rpma_flush is required
568 * after each one of them
569 */
570 if (!td_random(td)) {
571 /*
572 * When the io_u's are sequential and
573 * the current io_u is not the last one and
574 * the next one is also a write operation
575 * the flush can be postponed by one io_u and
576 * cover all of them which build a continuous
577 * sequence.
578 */
579 if ((i + 1 < ccd->io_u_queued_nr) &&
580 (ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE))
581 continue;
582 }
583
584 /* flush all writes which build a continuous sequence */
585 if (ccd->flush(td, flush_first_io_u, io_u, flush_len))
586 return -1;
587
588 /*
589 * reset the flush parameters in preparation for
590 * the next one
591 */
592 flush_first_io_u = NULL;
593 flush_len = 0;
594 } else {
595 log_err("unsupported IO mode: %s\n",
596 io_ddir_name(io_u->ddir));
597 return -1;
598 }
599 }
600
601 if ((fill_time = fio_fill_issue_time(td)))
602 fio_gettime(&now, NULL);
603
604 /* move executed io_us from queued[] to flight[] */
605 for (i = 0; i < ccd->io_u_queued_nr; i++) {
606 struct io_u *io_u = ccd->io_us_queued[i];
607
608 /* FIO does not do this if the engine is asynchronous */
609 if (fill_time)
610 memcpy(&io_u->issue_time, &now, sizeof(now));
611
612 /* move executed io_us from queued[] to flight[] */
613 ccd->io_us_flight[ccd->io_u_flight_nr] = io_u;
614 ccd->io_u_flight_nr++;
615
616 /*
617 * FIO says:
618 * If an engine has the commit hook
619 * it has to call io_u_queued() itself.
620 */
621 io_u_queued(td, io_u);
622 }
623
624 /* FIO does not do this if an engine has the commit hook. */
625 io_u_mark_submit(td, ccd->io_u_queued_nr);
626 ccd->io_u_queued_nr = 0;
627
628 return 0;
629}
630
631/*
632 * RETURN VALUE
633 * - > 0 - a number of completed io_us
634 * - 0 - when no complicitions received
635 * - (-1) - when an error occurred
636 */
637static int client_getevent_process(struct thread_data *td)
638{
639 struct librpma_fio_client_data *ccd = td->io_ops_data;
640 struct rpma_completion cmpl;
641 /* io_u->index of completed io_u (cmpl.op_context) */
642 unsigned int io_u_index;
643 /* # of completed io_us */
644 int cmpl_num = 0;
645 /* helpers */
646 struct io_u *io_u;
647 int i;
648 int ret;
649
650 /* get a completion */
651 if ((ret = rpma_conn_completion_get(ccd->conn, &cmpl))) {
652 /* lack of completion is not an error */
653 if (ret == RPMA_E_NO_COMPLETION) {
654 /* lack of completion is not an error */
655 return 0;
656 }
657
658 /* an error occurred */
659 librpma_td_verror(td, ret, "rpma_conn_completion_get");
660 return -1;
661 }
662
663 /* if io_us has completed with an error */
664 if (cmpl.op_status != IBV_WC_SUCCESS) {
665 td->error = cmpl.op_status;
666 return -1;
667 }
668
669 if (cmpl.op == RPMA_OP_SEND)
670 ++ccd->op_send_completed;
671 else if (cmpl.op == RPMA_OP_RECV)
672 ++ccd->op_recv_completed;
673
674 if ((ret = ccd->get_io_u_index(&cmpl, &io_u_index)) != 1)
675 return ret;
676
677 /* look for an io_u being completed */
678 for (i = 0; i < ccd->io_u_flight_nr; ++i) {
679 if (ccd->io_us_flight[i]->index == io_u_index) {
680 cmpl_num = i + 1;
681 break;
682 }
683 }
684
685 /* if no matching io_u has been found */
686 if (cmpl_num == 0) {
687 log_err(
688 "no matching io_u for received completion found (io_u_index=%u)\n",
689 io_u_index);
690 return -1;
691 }
692
693 /* move completed io_us to the completed in-memory queue */
694 for (i = 0; i < cmpl_num; ++i) {
695 /* get and prepare io_u */
696 io_u = ccd->io_us_flight[i];
697
698 /* append to the queue */
699 ccd->io_us_completed[ccd->io_u_completed_nr] = io_u;
700 ccd->io_u_completed_nr++;
701 }
702
703 /* remove completed io_us from the flight queue */
704 for (i = cmpl_num; i < ccd->io_u_flight_nr; ++i)
705 ccd->io_us_flight[i - cmpl_num] = ccd->io_us_flight[i];
706 ccd->io_u_flight_nr -= cmpl_num;
707
708 return cmpl_num;
709}
710
711int librpma_fio_client_getevents(struct thread_data *td, unsigned int min,
712 unsigned int max, const struct timespec *t)
713{
714 struct librpma_fio_client_data *ccd = td->io_ops_data;
715 /* total # of completed io_us */
716 int cmpl_num_total = 0;
717 /* # of completed io_us from a single event */
718 int cmpl_num;
719
720 do {
721 cmpl_num = client_getevent_process(td);
722 if (cmpl_num > 0) {
723 /* new completions collected */
724 cmpl_num_total += cmpl_num;
725 } else if (cmpl_num == 0) {
726 /*
727 * It is required to make sure that CQEs for SENDs
728 * will flow at least at the same pace as CQEs for RECVs.
729 */
730 if (cmpl_num_total >= min &&
731 ccd->op_send_completed >= ccd->op_recv_completed)
732 break;
733
734 /*
735 * To reduce CPU consumption one can use
736 * the rpma_conn_completion_wait() function.
737 * Note this greatly increase the latency
738 * and make the results less stable.
739 * The bandwidth stays more or less the same.
740 */
741 } else {
742 /* an error occurred */
743 return -1;
744 }
745
746 /*
747 * The expected max can be exceeded if CQEs for RECVs will come up
748 * faster than CQEs for SENDs. But it is required to make sure CQEs for
749 * SENDs will flow at least at the same pace as CQEs for RECVs.
750 */
751 } while (cmpl_num_total < max ||
752 ccd->op_send_completed < ccd->op_recv_completed);
753
754 /*
755 * All posted SENDs are completed and RECVs for them (responses) are
756 * completed. This is the initial situation so the counters are reset.
757 */
758 if (ccd->op_send_posted == ccd->op_send_completed &&
759 ccd->op_send_completed == ccd->op_recv_completed) {
760 ccd->op_send_posted = 0;
761 ccd->op_send_completed = 0;
762 ccd->op_recv_completed = 0;
763 }
764
765 return cmpl_num_total;
766}
767
768struct io_u *librpma_fio_client_event(struct thread_data *td, int event)
769{
770 struct librpma_fio_client_data *ccd = td->io_ops_data;
771 struct io_u *io_u;
772 int i;
773
774 /* get the first io_u from the queue */
775 io_u = ccd->io_us_completed[0];
776
777 /* remove the first io_u from the queue */
778 for (i = 1; i < ccd->io_u_completed_nr; ++i)
779 ccd->io_us_completed[i - 1] = ccd->io_us_completed[i];
780 ccd->io_u_completed_nr--;
781
782 dprint_io_u(io_u, "client_event");
783
784 return io_u;
785}
786
787char *librpma_fio_client_errdetails(struct io_u *io_u)
788{
789 /* get the string representation of an error */
790 enum ibv_wc_status status = io_u->error;
791 const char *status_str = ibv_wc_status_str(status);
792
793 char *details = strdup(status_str);
794 if (details == NULL) {
795 fprintf(stderr, "Error: %s\n", status_str);
796 fprintf(stderr, "Fatal error: out of memory. Aborting.\n");
797 abort();
798 }
799
800 /* FIO frees the returned string when it becomes obsolete */
801 return details;
802}
803
804int librpma_fio_server_init(struct thread_data *td)
805{
806 struct librpma_fio_options_values *o = td->eo;
807 struct librpma_fio_server_data *csd;
808 struct ibv_context *dev = NULL;
809 enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
810 int ret = -1;
811
812 /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
813#ifdef FIO_INC_DEBUG
814 if ((1UL << FD_NET) & fio_debug)
815 log_level_aux = RPMA_LOG_LEVEL_INFO;
816#endif
817
818 /* configure logging thresholds to see more details */
819 rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
820 rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
821
822
823 /* obtain an IBV context for a remote IP address */
824 if ((ret = rpma_utils_get_ibv_context(o->server_ip,
825 RPMA_UTIL_IBV_CONTEXT_LOCAL, &dev))) {
826 librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
827 return -1;
828 }
829
830 /* allocate server's data */
831 csd = calloc(1, sizeof(*csd));
832 if (csd == NULL) {
833 td_verror(td, errno, "calloc");
834 return -1;
835 }
836
837 /* create a new peer object */
838 if ((ret = rpma_peer_new(dev, &csd->peer))) {
839 librpma_td_verror(td, ret, "rpma_peer_new");
840 goto err_free_csd;
841 }
842
843 td->io_ops_data = csd;
844
845 return 0;
846
847err_free_csd:
848 free(csd);
849
850 return -1;
851}
852
853void librpma_fio_server_cleanup(struct thread_data *td)
854{
855 struct librpma_fio_server_data *csd = td->io_ops_data;
856 int ret;
857
858 if (csd == NULL)
859 return;
860
861 /* free the peer */
862 if ((ret = rpma_peer_delete(&csd->peer)))
863 librpma_td_verror(td, ret, "rpma_peer_delete");
864
865 free(csd);
866}
867
868int librpma_fio_server_open_file(struct thread_data *td, struct fio_file *f,
869 struct rpma_conn_cfg *cfg)
870{
871 struct librpma_fio_server_data *csd = td->io_ops_data;
872 struct librpma_fio_options_values *o = td->eo;
873 enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
874 struct librpma_fio_workspace ws = {0};
875 struct rpma_conn_private_data pdata;
876 uint32_t max_msg_num;
877 struct rpma_conn_req *conn_req;
878 struct rpma_conn *conn;
879 struct rpma_mr_local *mr;
880 char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
881 struct rpma_ep *ep;
882 size_t mem_size = td->o.size;
883 size_t mr_desc_size;
884 void *ws_ptr;
885 int usage_mem_type;
886 int ret;
887
888 if (!f->file_name) {
889 log_err("fio: filename is not set\n");
890 return -1;
891 }
892
893 /* start a listening endpoint at addr:port */
894 if (librpma_fio_td_port(o->port, td, port_td))
895 return -1;
896
897 if ((ret = rpma_ep_listen(csd->peer, o->server_ip, port_td, &ep))) {
898 librpma_td_verror(td, ret, "rpma_ep_listen");
899 return -1;
900 }
901
902 if (strcmp(f->file_name, "malloc") == 0) {
903 /* allocation from DRAM using posix_memalign() */
904 ws_ptr = librpma_fio_allocate_dram(td, mem_size, &csd->mem);
905 usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_VISIBILITY;
906 } else {
907 /* allocation from PMEM using pmem_map_file() */
908 ws_ptr = librpma_fio_allocate_pmem(td, f->file_name,
909 mem_size, &csd->mem);
910 usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT;
911 }
912
913 if (ws_ptr == NULL)
914 goto err_ep_shutdown;
915
916 f->real_file_size = mem_size;
917
918 if ((ret = rpma_mr_reg(csd->peer, ws_ptr, mem_size,
919 RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
920 RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
921 usage_mem_type, &mr))) {
922 librpma_td_verror(td, ret, "rpma_mr_reg");
923 goto err_free;
924 }
925
926 /* get size of the memory region's descriptor */
927 if ((ret = rpma_mr_get_descriptor_size(mr, &mr_desc_size))) {
928 librpma_td_verror(td, ret, "rpma_mr_get_descriptor_size");
929 goto err_mr_dereg;
930 }
931
932 /* verify size of the memory region's descriptor */
933 if (mr_desc_size > LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE) {
934 log_err(
935 "size of the memory region's descriptor is too big (max=%i)\n",
936 LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE);
937 goto err_mr_dereg;
938 }
939
940 /* get the memory region's descriptor */
941 if ((ret = rpma_mr_get_descriptor(mr, &ws.descriptor[0]))) {
942 librpma_td_verror(td, ret, "rpma_mr_get_descriptor");
943 goto err_mr_dereg;
944 }
945
946 if (cfg != NULL) {
947 if ((ret = rpma_conn_cfg_get_rq_size(cfg, &max_msg_num))) {
948 librpma_td_verror(td, ret, "rpma_conn_cfg_get_rq_size");
949 goto err_mr_dereg;
950 }
951
952 /* verify whether iodepth fits into uint16_t */
953 if (max_msg_num > UINT16_MAX) {
954 log_err("fio: iodepth too big (%u > %u)\n",
955 max_msg_num, UINT16_MAX);
956 return -1;
957 }
958
959 ws.max_msg_num = max_msg_num;
960 }
961
962 /* prepare a workspace description */
963 ws.direct_write_to_pmem = o->direct_write_to_pmem;
964 ws.mr_desc_size = mr_desc_size;
965 pdata.ptr = &ws;
966 pdata.len = sizeof(ws);
967
968 /* receive an incoming connection request */
969 if ((ret = rpma_ep_next_conn_req(ep, cfg, &conn_req))) {
970 librpma_td_verror(td, ret, "rpma_ep_next_conn_req");
971 goto err_mr_dereg;
972 }
973
974 if (csd->prepare_connection && csd->prepare_connection(td, conn_req))
975 goto err_req_delete;
976
977 /* accept the connection request and obtain the connection object */
978 if ((ret = rpma_conn_req_connect(&conn_req, &pdata, &conn))) {
979 librpma_td_verror(td, ret, "rpma_conn_req_connect");
980 goto err_req_delete;
981 }
982
983 /* wait for the connection to be established */
984 if ((ret = rpma_conn_next_event(conn, &conn_event))) {
985 librpma_td_verror(td, ret, "rpma_conn_next_event");
986 goto err_conn_delete;
987 } else if (conn_event != RPMA_CONN_ESTABLISHED) {
988 log_err("rpma_conn_next_event returned an unexptected event\n");
989 goto err_conn_delete;
990 }
991
992 /* end-point is no longer needed */
993 (void) rpma_ep_shutdown(&ep);
994
995 csd->ws_mr = mr;
996 csd->ws_ptr = ws_ptr;
997 csd->conn = conn;
998
999 return 0;
1000
1001err_conn_delete:
1002 (void) rpma_conn_delete(&conn);
1003
1004err_req_delete:
1005 (void) rpma_conn_req_delete(&conn_req);
1006
1007err_mr_dereg:
1008 (void) rpma_mr_dereg(&mr);
1009
1010err_free:
1011 librpma_fio_free(&csd->mem);
1012
1013err_ep_shutdown:
1014 (void) rpma_ep_shutdown(&ep);
1015
1016 return -1;
1017}
1018
1019int librpma_fio_server_close_file(struct thread_data *td, struct fio_file *f)
1020{
1021 struct librpma_fio_server_data *csd = td->io_ops_data;
1022 enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
1023 int rv = 0;
1024 int ret;
1025
1026 /* wait for the connection to be closed */
1027 ret = rpma_conn_next_event(csd->conn, &conn_event);
1028 if (!ret && conn_event != RPMA_CONN_CLOSED) {
1029 log_err("rpma_conn_next_event returned an unexptected event\n");
1030 rv = -1;
1031 }
1032
1033 if ((ret = rpma_conn_disconnect(csd->conn))) {
1034 librpma_td_verror(td, ret, "rpma_conn_disconnect");
1035 rv = -1;
1036 }
1037
1038 if ((ret = rpma_conn_delete(&csd->conn))) {
1039 librpma_td_verror(td, ret, "rpma_conn_delete");
1040 rv = -1;
1041 }
1042
1043 if ((ret = rpma_mr_dereg(&csd->ws_mr))) {
1044 librpma_td_verror(td, ret, "rpma_mr_dereg");
1045 rv = -1;
1046 }
1047
1048 librpma_fio_free(&csd->mem);
1049
1050 return rv;
1051}