t/nvmept_trim: increase transfer size for some tests
[fio.git] / engines / librpma_fio.c
1 /*
2  * librpma_fio: librpma_apm and librpma_gpspm engines' common part.
3  *
4  * Copyright 2021, Intel Corporation
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License,
8  * version 2 as published by the Free Software Foundation..
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  */
15
16 #include "librpma_fio.h"
17
18 #include <libpmem.h>
19
20 struct fio_option librpma_fio_options[] = {
21         {
22                 .name   = "serverip",
23                 .lname  = "rpma_server_ip",
24                 .type   = FIO_OPT_STR_STORE,
25                 .off1   = offsetof(struct librpma_fio_options_values, server_ip),
26                 .help   = "IP address the server is listening on",
27                 .def    = "",
28                 .category = FIO_OPT_C_ENGINE,
29                 .group  = FIO_OPT_G_LIBRPMA,
30         },
31         {
32                 .name   = "port",
33                 .lname  = "rpma_server port",
34                 .type   = FIO_OPT_STR_STORE,
35                 .off1   = offsetof(struct librpma_fio_options_values, port),
36                 .help   = "port the server is listening on",
37                 .def    = "7204",
38                 .category = FIO_OPT_C_ENGINE,
39                 .group  = FIO_OPT_G_LIBRPMA,
40         },
41         {
42                 .name   = "direct_write_to_pmem",
43                 .lname  = "Direct Write to PMem (via RDMA) from the remote host is possible",
44                 .type   = FIO_OPT_BOOL,
45                 .off1   = offsetof(struct librpma_fio_options_values,
46                                         direct_write_to_pmem),
47                 .help   = "Set to true ONLY when Direct Write to PMem from the remote host is possible (https://pmem.io/rpma/documentation/basic-direct-write-to-pmem.html)",
48                 .def    = "",
49                 .category = FIO_OPT_C_ENGINE,
50                 .group  = FIO_OPT_G_LIBRPMA,
51         },
52         {
53                 .name   = "busy_wait_polling",
54                 .lname  = "Set to 0 to wait for completion instead of busy-wait polling completion.",
55                 .type   = FIO_OPT_BOOL,
56                 .off1   = offsetof(struct librpma_fio_options_values,
57                                         busy_wait_polling),
58                 .help   = "Set to false if you want to reduce CPU usage",
59                 .def    = "1",
60                 .category = FIO_OPT_C_ENGINE,
61                 .group  = FIO_OPT_G_LIBRPMA,
62         },
63         {
64                 .name   = NULL,
65         },
66 };
67
68 int librpma_fio_td_port(const char *port_base_str, struct thread_data *td,
69                 char *port_out)
70 {
71         unsigned long int port_ul = strtoul(port_base_str, NULL, 10);
72         unsigned int port_new;
73
74         port_out[0] = '\0';
75
76         if (port_ul == ULONG_MAX) {
77                 td_verror(td, errno, "strtoul");
78                 return -1;
79         }
80         port_ul += td->thread_number - 1;
81         if (port_ul >= UINT_MAX) {
82                 log_err("[%u] port number (%lu) bigger than UINT_MAX\n",
83                         td->thread_number, port_ul);
84                 return -1;
85         }
86
87         port_new = port_ul;
88         snprintf(port_out, LIBRPMA_FIO_PORT_STR_LEN_MAX - 1, "%u", port_new);
89
90         return 0;
91 }
92
93 char *librpma_fio_allocate_dram(struct thread_data *td, size_t size,
94         struct librpma_fio_mem *mem)
95 {
96         char *mem_ptr = NULL;
97         int ret;
98
99         if ((ret = posix_memalign((void **)&mem_ptr, page_size, size))) {
100                 log_err("fio: posix_memalign() failed\n");
101                 td_verror(td, ret, "posix_memalign");
102                 return NULL;
103         }
104
105         mem->mem_ptr = mem_ptr;
106         mem->size_mmap = 0;
107
108         return mem_ptr;
109 }
110
111 char *librpma_fio_allocate_pmem(struct thread_data *td, const char *filename,
112                 size_t size, struct librpma_fio_mem *mem)
113 {
114         size_t size_mmap = 0;
115         char *mem_ptr = NULL;
116         int is_pmem = 0;
117         size_t ws_offset;
118
119         if (size % page_size) {
120                 log_err("fio: size (%zu) is not aligned to page size (%zu)\n",
121                         size, page_size);
122                 return NULL;
123         }
124
125         ws_offset = (td->thread_number - 1) * size;
126
127         if (!filename) {
128                 log_err("fio: filename is not set\n");
129                 return NULL;
130         }
131
132         /* map the file */
133         mem_ptr = pmem_map_file(filename, 0 /* len */, 0 /* flags */,
134                         0 /* mode */, &size_mmap, &is_pmem);
135         if (mem_ptr == NULL) {
136                 log_err("fio: pmem_map_file(%s) failed\n", filename);
137                 /* pmem_map_file() sets errno on failure */
138                 td_verror(td, errno, "pmem_map_file");
139                 return NULL;
140         }
141
142         /* pmem is expected */
143         if (!is_pmem) {
144                 log_err("fio: %s is not located in persistent memory\n",
145                         filename);
146                 goto err_unmap;
147         }
148
149         /* check size of allocated persistent memory */
150         if (size_mmap < ws_offset + size) {
151                 log_err(
152                         "fio: %s is too small to handle so many threads (%zu < %zu)\n",
153                         filename, size_mmap, ws_offset + size);
154                 goto err_unmap;
155         }
156
157         log_info("fio: size of memory mapped from the file %s: %zu\n",
158                 filename, size_mmap);
159
160         mem->mem_ptr = mem_ptr;
161         mem->size_mmap = size_mmap;
162
163         return mem_ptr + ws_offset;
164
165 err_unmap:
166         (void) pmem_unmap(mem_ptr, size_mmap);
167         return NULL;
168 }
169
170 void librpma_fio_free(struct librpma_fio_mem *mem)
171 {
172         if (mem->size_mmap)
173                 (void) pmem_unmap(mem->mem_ptr, mem->size_mmap);
174         else
175                 free(mem->mem_ptr);
176 }
177
178 #define LIBRPMA_FIO_RETRY_MAX_NO        10
179 #define LIBRPMA_FIO_RETRY_DELAY_S       5
180
181 int librpma_fio_client_init(struct thread_data *td,
182                 struct rpma_conn_cfg *cfg)
183 {
184         struct librpma_fio_client_data *ccd;
185         struct librpma_fio_options_values *o = td->eo;
186         struct ibv_context *dev = NULL;
187         char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
188         struct rpma_conn_req *req = NULL;
189         enum rpma_conn_event event;
190         struct rpma_conn_private_data pdata;
191         enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
192         int remote_flush_type;
193         int retry;
194         int ret;
195
196         /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
197 #ifdef FIO_INC_DEBUG
198         if ((1UL << FD_NET) & fio_debug)
199                 log_level_aux = RPMA_LOG_LEVEL_INFO;
200 #endif
201
202         /* configure logging thresholds to see more details */
203         rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
204         rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
205
206         /* obtain an IBV context for a remote IP address */
207         if ((ret = rpma_utils_get_ibv_context(o->server_ip,
208                         RPMA_UTIL_IBV_CONTEXT_REMOTE, &dev))) {
209                 librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
210                 return -1;
211         }
212
213         /* allocate client's data */
214         ccd = calloc(1, sizeof(*ccd));
215         if (ccd == NULL) {
216                 td_verror(td, errno, "calloc");
217                 return -1;
218         }
219
220         /* allocate all in-memory queues */
221         ccd->io_us_queued = calloc(td->o.iodepth, sizeof(*ccd->io_us_queued));
222         if (ccd->io_us_queued == NULL) {
223                 td_verror(td, errno, "calloc");
224                 goto err_free_ccd;
225         }
226
227         ccd->io_us_flight = calloc(td->o.iodepth, sizeof(*ccd->io_us_flight));
228         if (ccd->io_us_flight == NULL) {
229                 td_verror(td, errno, "calloc");
230                 goto err_free_io_u_queues;
231         }
232
233         ccd->io_us_completed = calloc(td->o.iodepth,
234                         sizeof(*ccd->io_us_completed));
235         if (ccd->io_us_completed == NULL) {
236                 td_verror(td, errno, "calloc");
237                 goto err_free_io_u_queues;
238         }
239
240         /* create a new peer object */
241         if ((ret = rpma_peer_new(dev, &ccd->peer))) {
242                 librpma_td_verror(td, ret, "rpma_peer_new");
243                 goto err_free_io_u_queues;
244         }
245
246         /* create a connection request */
247         if (librpma_fio_td_port(o->port, td, port_td))
248                 goto err_peer_delete;
249
250         for (retry = 0; retry < LIBRPMA_FIO_RETRY_MAX_NO; retry++) {
251                 if ((ret = rpma_conn_req_new(ccd->peer, o->server_ip, port_td,
252                                 cfg, &req))) {
253                         librpma_td_verror(td, ret, "rpma_conn_req_new");
254                         goto err_peer_delete;
255                 }
256
257                 /*
258                  * Connect the connection request
259                  * and obtain the connection object.
260                  */
261                 if ((ret = rpma_conn_req_connect(&req, NULL, &ccd->conn))) {
262                         librpma_td_verror(td, ret, "rpma_conn_req_connect");
263                         goto err_req_delete;
264                 }
265
266                 /* wait for the connection to establish */
267                 if ((ret = rpma_conn_next_event(ccd->conn, &event))) {
268                         librpma_td_verror(td, ret, "rpma_conn_next_event");
269                         goto err_conn_delete;
270                 } else if (event == RPMA_CONN_ESTABLISHED) {
271                         break;
272                 } else if (event == RPMA_CONN_REJECTED) {
273                         (void) rpma_conn_disconnect(ccd->conn);
274                         (void) rpma_conn_delete(&ccd->conn);
275                         if (retry < LIBRPMA_FIO_RETRY_MAX_NO - 1) {
276                                 log_err("Thread [%d]: Retrying (#%i) ...\n",
277                                         td->thread_number, retry + 1);
278                                 sleep(LIBRPMA_FIO_RETRY_DELAY_S);
279                         } else {
280                                 log_err(
281                                         "Thread [%d]: The maximum number of retries exceeded. Closing.\n",
282                                         td->thread_number);
283                         }
284                 } else {
285                         log_err(
286                                 "rpma_conn_next_event returned an unexptected event: (%s != RPMA_CONN_ESTABLISHED)\n",
287                                 rpma_utils_conn_event_2str(event));
288                         goto err_conn_delete;
289                 }
290         }
291
292         if (retry > 0)
293                 log_err("Thread [%d]: Connected after retry #%i\n",
294                         td->thread_number, retry);
295
296         if (ccd->conn == NULL)
297                 goto err_peer_delete;
298
299         /* get the connection's private data sent from the server */
300         if ((ret = rpma_conn_get_private_data(ccd->conn, &pdata))) {
301                 librpma_td_verror(td, ret, "rpma_conn_get_private_data");
302                 goto err_conn_delete;
303         }
304
305         /* get the server's workspace representation */
306         ccd->ws = pdata.ptr;
307
308         /* create the server's memory representation */
309         if ((ret = rpma_mr_remote_from_descriptor(&ccd->ws->descriptor[0],
310                         ccd->ws->mr_desc_size, &ccd->server_mr))) {
311                 librpma_td_verror(td, ret, "rpma_mr_remote_from_descriptor");
312                 goto err_conn_delete;
313         }
314
315         /* get the total size of the shared server memory */
316         if ((ret = rpma_mr_remote_get_size(ccd->server_mr, &ccd->ws_size))) {
317                 librpma_td_verror(td, ret, "rpma_mr_remote_get_size");
318                 goto err_conn_delete;
319         }
320
321         /* get flush type of the remote node */
322         if ((ret = rpma_mr_remote_get_flush_type(ccd->server_mr,
323                         &remote_flush_type))) {
324                 librpma_td_verror(td, ret, "rpma_mr_remote_get_flush_type");
325                 goto err_conn_delete;
326         }
327
328         ccd->server_mr_flush_type =
329                 (remote_flush_type & RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT) ?
330                 RPMA_FLUSH_TYPE_PERSISTENT : RPMA_FLUSH_TYPE_VISIBILITY;
331
332         /*
333          * Assure an io_us buffer allocation is page-size-aligned which is required
334          * to register for RDMA. User-provided value is intentionally ignored.
335          */
336         td->o.mem_align = page_size;
337
338         td->io_ops_data = ccd;
339
340         return 0;
341
342 err_conn_delete:
343         (void) rpma_conn_disconnect(ccd->conn);
344         (void) rpma_conn_delete(&ccd->conn);
345
346 err_req_delete:
347         (void) rpma_conn_req_delete(&req);
348
349 err_peer_delete:
350         (void) rpma_peer_delete(&ccd->peer);
351
352 err_free_io_u_queues:
353         free(ccd->io_us_queued);
354         free(ccd->io_us_flight);
355         free(ccd->io_us_completed);
356
357 err_free_ccd:
358         free(ccd);
359
360         return -1;
361 }
362
363 void librpma_fio_client_cleanup(struct thread_data *td)
364 {
365         struct librpma_fio_client_data *ccd = td->io_ops_data;
366         enum rpma_conn_event ev;
367         int ret;
368
369         if (ccd == NULL)
370                 return;
371
372         /* delete the iou's memory registration */
373         if ((ret = rpma_mr_dereg(&ccd->orig_mr)))
374                 librpma_td_verror(td, ret, "rpma_mr_dereg");
375         /* delete the iou's memory registration */
376         if ((ret = rpma_mr_remote_delete(&ccd->server_mr)))
377                 librpma_td_verror(td, ret, "rpma_mr_remote_delete");
378         /* initiate disconnection */
379         if ((ret = rpma_conn_disconnect(ccd->conn)))
380                 librpma_td_verror(td, ret, "rpma_conn_disconnect");
381         /* wait for disconnection to end up */
382         if ((ret = rpma_conn_next_event(ccd->conn, &ev))) {
383                 librpma_td_verror(td, ret, "rpma_conn_next_event");
384         } else if (ev != RPMA_CONN_CLOSED) {
385                 log_err(
386                         "client_cleanup received an unexpected event (%s != RPMA_CONN_CLOSED)\n",
387                         rpma_utils_conn_event_2str(ev));
388         }
389         /* delete the connection */
390         if ((ret = rpma_conn_delete(&ccd->conn)))
391                 librpma_td_verror(td, ret, "rpma_conn_delete");
392         /* delete the peer */
393         if ((ret = rpma_peer_delete(&ccd->peer)))
394                 librpma_td_verror(td, ret, "rpma_peer_delete");
395         /* free the software queues */
396         free(ccd->io_us_queued);
397         free(ccd->io_us_flight);
398         free(ccd->io_us_completed);
399         free(ccd);
400         td->io_ops_data = NULL; /* zero ccd */
401 }
402
403 int librpma_fio_file_nop(struct thread_data *td, struct fio_file *f)
404 {
405         /* NOP */
406         return 0;
407 }
408
409 int librpma_fio_client_post_init(struct thread_data *td)
410 {
411         struct librpma_fio_client_data *ccd =  td->io_ops_data;
412         size_t io_us_size;
413         int ret;
414
415         /*
416          * td->orig_buffer is not aligned. The engine requires aligned io_us
417          * so FIO alignes up the address using the formula below.
418          */
419         ccd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
420                         td->o.mem_align;
421
422         /*
423          * td->orig_buffer_size beside the space really consumed by io_us
424          * has paddings which can be omitted for the memory registration.
425          */
426         io_us_size = (unsigned long long)td_max_bs(td) *
427                         (unsigned long long)td->o.iodepth;
428
429         if ((ret = rpma_mr_reg(ccd->peer, ccd->orig_buffer_aligned, io_us_size,
430                         RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
431                         RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
432                         RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT, &ccd->orig_mr)))
433                 librpma_td_verror(td, ret, "rpma_mr_reg");
434         return ret;
435 }
436
437 int librpma_fio_client_get_file_size(struct thread_data *td,
438                 struct fio_file *f)
439 {
440         struct librpma_fio_client_data *ccd = td->io_ops_data;
441
442         f->real_file_size = ccd->ws_size;
443         fio_file_set_size_known(f);
444
445         return 0;
446 }
447
448 static enum fio_q_status client_queue_sync(struct thread_data *td,
449                 struct io_u *io_u)
450 {
451         struct librpma_fio_client_data *ccd = td->io_ops_data;
452         struct rpma_completion cmpl;
453         unsigned io_u_index;
454         int ret;
455
456         /* execute io_u */
457         if (io_u->ddir == DDIR_READ) {
458                 /* post an RDMA read operation */
459                 if (librpma_fio_client_io_read(td, io_u,
460                                 RPMA_F_COMPLETION_ALWAYS))
461                         goto err;
462         } else if (io_u->ddir == DDIR_WRITE) {
463                 /* post an RDMA write operation */
464                 if (librpma_fio_client_io_write(td, io_u))
465                         goto err;
466                 if (ccd->flush(td, io_u, io_u, io_u->xfer_buflen))
467                         goto err;
468         } else {
469                 log_err("unsupported IO mode: %s\n", io_ddir_name(io_u->ddir));
470                 goto err;
471         }
472
473         do {
474                 /* get a completion */
475                 ret = rpma_conn_completion_get(ccd->conn, &cmpl);
476                 if (ret == RPMA_E_NO_COMPLETION) {
477                         /* lack of completion is not an error */
478                         continue;
479                 } else if (ret != 0) {
480                         /* an error occurred */
481                         librpma_td_verror(td, ret, "rpma_conn_completion_get");
482                         goto err;
483                 }
484
485                 /* if io_us has completed with an error */
486                 if (cmpl.op_status != IBV_WC_SUCCESS)
487                         goto err;
488
489                 if (cmpl.op == RPMA_OP_SEND)
490                         ++ccd->op_send_completed;
491                 else {
492                         if (cmpl.op == RPMA_OP_RECV)
493                                 ++ccd->op_recv_completed;
494
495                         break;
496                 }
497         } while (1);
498
499         if (ccd->get_io_u_index(&cmpl, &io_u_index) != 1)
500                 goto err;
501
502         if (io_u->index != io_u_index) {
503                 log_err(
504                         "no matching io_u for received completion found (io_u_index=%u)\n",
505                         io_u_index);
506                 goto err;
507         }
508
509         /* make sure all SENDs are completed before exit - clean up SQ */
510         if (librpma_fio_client_io_complete_all_sends(td))
511                 goto err;
512
513         return FIO_Q_COMPLETED;
514
515 err:
516         io_u->error = -1;
517         return FIO_Q_COMPLETED;
518 }
519
520 enum fio_q_status librpma_fio_client_queue(struct thread_data *td,
521                 struct io_u *io_u)
522 {
523         struct librpma_fio_client_data *ccd = td->io_ops_data;
524
525         if (ccd->io_u_queued_nr == (int)td->o.iodepth)
526                 return FIO_Q_BUSY;
527
528         if (td->o.sync_io)
529                 return client_queue_sync(td, io_u);
530
531         /* io_u -> queued[] */
532         ccd->io_us_queued[ccd->io_u_queued_nr] = io_u;
533         ccd->io_u_queued_nr++;
534
535         return FIO_Q_QUEUED;
536 }
537
538 int librpma_fio_client_commit(struct thread_data *td)
539 {
540         struct librpma_fio_client_data *ccd = td->io_ops_data;
541         int flags = RPMA_F_COMPLETION_ON_ERROR;
542         struct timespec now;
543         bool fill_time;
544         int i;
545         struct io_u *flush_first_io_u = NULL;
546         unsigned long long int flush_len = 0;
547
548         if (!ccd->io_us_queued)
549                 return -1;
550
551         /* execute all io_us from queued[] */
552         for (i = 0; i < ccd->io_u_queued_nr; i++) {
553                 struct io_u *io_u = ccd->io_us_queued[i];
554
555                 if (io_u->ddir == DDIR_READ) {
556                         if (i + 1 == ccd->io_u_queued_nr ||
557                             ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE)
558                                 flags = RPMA_F_COMPLETION_ALWAYS;
559                         /* post an RDMA read operation */
560                         if (librpma_fio_client_io_read(td, io_u, flags))
561                                 return -1;
562                 } else if (io_u->ddir == DDIR_WRITE) {
563                         /* post an RDMA write operation */
564                         if (librpma_fio_client_io_write(td, io_u))
565                                 return -1;
566
567                         /* cache the first io_u in the sequence */
568                         if (flush_first_io_u == NULL)
569                                 flush_first_io_u = io_u;
570
571                         /*
572                          * the flush length is the sum of all io_u's creating
573                          * the sequence
574                          */
575                         flush_len += io_u->xfer_buflen;
576
577                         /*
578                          * if io_u's are random the rpma_flush is required
579                          * after each one of them
580                          */
581                         if (!td_random(td)) {
582                                 /*
583                                  * When the io_u's are sequential and
584                                  * the current io_u is not the last one and
585                                  * the next one is also a write operation
586                                  * the flush can be postponed by one io_u and
587                                  * cover all of them which build a continuous
588                                  * sequence.
589                                  */
590                                 if ((i + 1 < ccd->io_u_queued_nr) &&
591                                     (ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE))
592                                         continue;
593                         }
594
595                         /* flush all writes which build a continuous sequence */
596                         if (ccd->flush(td, flush_first_io_u, io_u, flush_len))
597                                 return -1;
598
599                         /*
600                          * reset the flush parameters in preparation for
601                          * the next one
602                          */
603                         flush_first_io_u = NULL;
604                         flush_len = 0;
605                 } else {
606                         log_err("unsupported IO mode: %s\n",
607                                 io_ddir_name(io_u->ddir));
608                         return -1;
609                 }
610         }
611
612         if ((fill_time = fio_fill_issue_time(td)))
613                 fio_gettime(&now, NULL);
614
615         /* move executed io_us from queued[] to flight[] */
616         for (i = 0; i < ccd->io_u_queued_nr; i++) {
617                 struct io_u *io_u = ccd->io_us_queued[i];
618
619                 /* FIO does not do this if the engine is asynchronous */
620                 if (fill_time)
621                         memcpy(&io_u->issue_time, &now, sizeof(now));
622
623                 /* move executed io_us from queued[] to flight[] */
624                 ccd->io_us_flight[ccd->io_u_flight_nr] = io_u;
625                 ccd->io_u_flight_nr++;
626
627                 /*
628                  * FIO says:
629                  * If an engine has the commit hook
630                  * it has to call io_u_queued() itself.
631                  */
632                 io_u_queued(td, io_u);
633         }
634
635         /* FIO does not do this if an engine has the commit hook. */
636         io_u_mark_submit(td, ccd->io_u_queued_nr);
637         ccd->io_u_queued_nr = 0;
638
639         return 0;
640 }
641
642 /*
643  * RETURN VALUE
644  * - > 0  - a number of completed io_us
645  * -   0  - when no complicitions received
646  * - (-1) - when an error occurred
647  */
648 static int client_getevent_process(struct thread_data *td)
649 {
650         struct librpma_fio_client_data *ccd = td->io_ops_data;
651         struct rpma_completion cmpl;
652         /* io_u->index of completed io_u (cmpl.op_context) */
653         unsigned int io_u_index;
654         /* # of completed io_us */
655         int cmpl_num = 0;
656         /* helpers */
657         struct io_u *io_u;
658         int i;
659         int ret;
660
661         /* get a completion */
662         if ((ret = rpma_conn_completion_get(ccd->conn, &cmpl))) {
663                 /* lack of completion is not an error */
664                 if (ret == RPMA_E_NO_COMPLETION) {
665                         /* lack of completion is not an error */
666                         return 0;
667                 }
668
669                 /* an error occurred */
670                 librpma_td_verror(td, ret, "rpma_conn_completion_get");
671                 return -1;
672         }
673
674         /* if io_us has completed with an error */
675         if (cmpl.op_status != IBV_WC_SUCCESS) {
676                 td->error = cmpl.op_status;
677                 return -1;
678         }
679
680         if (cmpl.op == RPMA_OP_SEND)
681                 ++ccd->op_send_completed;
682         else if (cmpl.op == RPMA_OP_RECV)
683                 ++ccd->op_recv_completed;
684
685         if ((ret = ccd->get_io_u_index(&cmpl, &io_u_index)) != 1)
686                 return ret;
687
688         /* look for an io_u being completed */
689         for (i = 0; i < ccd->io_u_flight_nr; ++i) {
690                 if (ccd->io_us_flight[i]->index == io_u_index) {
691                         cmpl_num = i + 1;
692                         break;
693                 }
694         }
695
696         /* if no matching io_u has been found */
697         if (cmpl_num == 0) {
698                 log_err(
699                         "no matching io_u for received completion found (io_u_index=%u)\n",
700                         io_u_index);
701                 return -1;
702         }
703
704         /* move completed io_us to the completed in-memory queue */
705         for (i = 0; i < cmpl_num; ++i) {
706                 /* get and prepare io_u */
707                 io_u = ccd->io_us_flight[i];
708
709                 /* append to the queue */
710                 ccd->io_us_completed[ccd->io_u_completed_nr] = io_u;
711                 ccd->io_u_completed_nr++;
712         }
713
714         /* remove completed io_us from the flight queue */
715         for (i = cmpl_num; i < ccd->io_u_flight_nr; ++i)
716                 ccd->io_us_flight[i - cmpl_num] = ccd->io_us_flight[i];
717         ccd->io_u_flight_nr -= cmpl_num;
718
719         return cmpl_num;
720 }
721
722 int librpma_fio_client_getevents(struct thread_data *td, unsigned int min,
723                 unsigned int max, const struct timespec *t)
724 {
725         struct librpma_fio_client_data *ccd = td->io_ops_data;
726         /* total # of completed io_us */
727         int cmpl_num_total = 0;
728         /* # of completed io_us from a single event */
729         int cmpl_num;
730
731         do {
732                 cmpl_num = client_getevent_process(td);
733                 if (cmpl_num > 0) {
734                         /* new completions collected */
735                         cmpl_num_total += cmpl_num;
736                 } else if (cmpl_num == 0) {
737                         /*
738                          * It is required to make sure that CQEs for SENDs
739                          * will flow at least at the same pace as CQEs for RECVs.
740                          */
741                         if (cmpl_num_total >= min &&
742                             ccd->op_send_completed >= ccd->op_recv_completed)
743                                 break;
744
745                         /*
746                          * To reduce CPU consumption one can use
747                          * the rpma_conn_completion_wait() function.
748                          * Note this greatly increase the latency
749                          * and make the results less stable.
750                          * The bandwidth stays more or less the same.
751                          */
752                 } else {
753                         /* an error occurred */
754                         return -1;
755                 }
756
757                 /*
758                  * The expected max can be exceeded if CQEs for RECVs will come up
759                  * faster than CQEs for SENDs. But it is required to make sure CQEs for
760                  * SENDs will flow at least at the same pace as CQEs for RECVs.
761                  */
762         } while (cmpl_num_total < max ||
763                         ccd->op_send_completed < ccd->op_recv_completed);
764
765         /*
766          * All posted SENDs are completed and RECVs for them (responses) are
767          * completed. This is the initial situation so the counters are reset.
768          */
769         if (ccd->op_send_posted == ccd->op_send_completed &&
770                         ccd->op_send_completed == ccd->op_recv_completed) {
771                 ccd->op_send_posted = 0;
772                 ccd->op_send_completed = 0;
773                 ccd->op_recv_completed = 0;
774         }
775
776         return cmpl_num_total;
777 }
778
779 struct io_u *librpma_fio_client_event(struct thread_data *td, int event)
780 {
781         struct librpma_fio_client_data *ccd = td->io_ops_data;
782         struct io_u *io_u;
783         int i;
784
785         /* get the first io_u from the queue */
786         io_u = ccd->io_us_completed[0];
787
788         /* remove the first io_u from the queue */
789         for (i = 1; i < ccd->io_u_completed_nr; ++i)
790                 ccd->io_us_completed[i - 1] = ccd->io_us_completed[i];
791         ccd->io_u_completed_nr--;
792
793         dprint_io_u(io_u, "client_event");
794
795         return io_u;
796 }
797
798 char *librpma_fio_client_errdetails(struct io_u *io_u)
799 {
800         /* get the string representation of an error */
801         enum ibv_wc_status status = io_u->error;
802         const char *status_str = ibv_wc_status_str(status);
803
804         char *details = strdup(status_str);
805         if (details == NULL) {
806                 fprintf(stderr, "Error: %s\n", status_str);
807                 fprintf(stderr, "Fatal error: out of memory. Aborting.\n");
808                 abort();
809         }
810
811         /* FIO frees the returned string when it becomes obsolete */
812         return details;
813 }
814
815 int librpma_fio_server_init(struct thread_data *td)
816 {
817         struct librpma_fio_options_values *o = td->eo;
818         struct librpma_fio_server_data *csd;
819         struct ibv_context *dev = NULL;
820         enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
821         int ret = -1;
822
823         /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
824 #ifdef FIO_INC_DEBUG
825         if ((1UL << FD_NET) & fio_debug)
826                 log_level_aux = RPMA_LOG_LEVEL_INFO;
827 #endif
828
829         /* configure logging thresholds to see more details */
830         rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
831         rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
832
833
834         /* obtain an IBV context for a remote IP address */
835         if ((ret = rpma_utils_get_ibv_context(o->server_ip,
836                         RPMA_UTIL_IBV_CONTEXT_LOCAL, &dev))) {
837                 librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
838                 return -1;
839         }
840
841         /* allocate server's data */
842         csd = calloc(1, sizeof(*csd));
843         if (csd == NULL) {
844                 td_verror(td, errno, "calloc");
845                 return -1;
846         }
847
848         /* create a new peer object */
849         if ((ret = rpma_peer_new(dev, &csd->peer))) {
850                 librpma_td_verror(td, ret, "rpma_peer_new");
851                 goto err_free_csd;
852         }
853
854         td->io_ops_data = csd;
855
856         return 0;
857
858 err_free_csd:
859         free(csd);
860
861         return -1;
862 }
863
864 void librpma_fio_server_cleanup(struct thread_data *td)
865 {
866         struct librpma_fio_server_data *csd =  td->io_ops_data;
867         int ret;
868
869         if (csd == NULL)
870                 return;
871
872         /* free the peer */
873         if ((ret = rpma_peer_delete(&csd->peer)))
874                 librpma_td_verror(td, ret, "rpma_peer_delete");
875
876         free(csd);
877 }
878
879 int librpma_fio_server_open_file(struct thread_data *td, struct fio_file *f,
880                 struct rpma_conn_cfg *cfg)
881 {
882         struct librpma_fio_server_data *csd = td->io_ops_data;
883         struct librpma_fio_options_values *o = td->eo;
884         enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
885         struct librpma_fio_workspace ws = {0};
886         struct rpma_conn_private_data pdata;
887         uint32_t max_msg_num;
888         struct rpma_conn_req *conn_req;
889         struct rpma_conn *conn;
890         struct rpma_mr_local *mr;
891         char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
892         struct rpma_ep *ep;
893         size_t mem_size = td->o.size;
894         size_t mr_desc_size;
895         void *ws_ptr;
896         int usage_mem_type;
897         int ret;
898
899         if (!f->file_name) {
900                 log_err("fio: filename is not set\n");
901                 return -1;
902         }
903
904         /* start a listening endpoint at addr:port */
905         if (librpma_fio_td_port(o->port, td, port_td))
906                 return -1;
907
908         if ((ret = rpma_ep_listen(csd->peer, o->server_ip, port_td, &ep))) {
909                 librpma_td_verror(td, ret, "rpma_ep_listen");
910                 return -1;
911         }
912
913         if (strcmp(f->file_name, "malloc") == 0) {
914                 /* allocation from DRAM using posix_memalign() */
915                 ws_ptr = librpma_fio_allocate_dram(td, mem_size, &csd->mem);
916                 usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_VISIBILITY;
917         } else {
918                 /* allocation from PMEM using pmem_map_file() */
919                 ws_ptr = librpma_fio_allocate_pmem(td, f->file_name,
920                                 mem_size, &csd->mem);
921                 usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT;
922         }
923
924         if (ws_ptr == NULL)
925                 goto err_ep_shutdown;
926
927         f->real_file_size = mem_size;
928
929         if ((ret = rpma_mr_reg(csd->peer, ws_ptr, mem_size,
930                         RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
931                         RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
932                         usage_mem_type, &mr))) {
933                 librpma_td_verror(td, ret, "rpma_mr_reg");
934                 goto err_free;
935         }
936
937         /* get size of the memory region's descriptor */
938         if ((ret = rpma_mr_get_descriptor_size(mr, &mr_desc_size))) {
939                 librpma_td_verror(td, ret, "rpma_mr_get_descriptor_size");
940                 goto err_mr_dereg;
941         }
942
943         /* verify size of the memory region's descriptor */
944         if (mr_desc_size > LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE) {
945                 log_err(
946                         "size of the memory region's descriptor is too big (max=%i)\n",
947                         LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE);
948                 goto err_mr_dereg;
949         }
950
951         /* get the memory region's descriptor */
952         if ((ret = rpma_mr_get_descriptor(mr, &ws.descriptor[0]))) {
953                 librpma_td_verror(td, ret, "rpma_mr_get_descriptor");
954                 goto err_mr_dereg;
955         }
956
957         if (cfg != NULL) {
958                 if ((ret = rpma_conn_cfg_get_rq_size(cfg, &max_msg_num))) {
959                         librpma_td_verror(td, ret, "rpma_conn_cfg_get_rq_size");
960                         goto err_mr_dereg;
961                 }
962
963                 /* verify whether iodepth fits into uint16_t */
964                 if (max_msg_num > UINT16_MAX) {
965                         log_err("fio: iodepth too big (%u > %u)\n",
966                                 max_msg_num, UINT16_MAX);
967                         return -1;
968                 }
969
970                 ws.max_msg_num = max_msg_num;
971         }
972
973         /* prepare a workspace description */
974         ws.direct_write_to_pmem = o->direct_write_to_pmem;
975         ws.mr_desc_size = mr_desc_size;
976         pdata.ptr = &ws;
977         pdata.len = sizeof(ws);
978
979         /* receive an incoming connection request */
980         if ((ret = rpma_ep_next_conn_req(ep, cfg, &conn_req))) {
981                 librpma_td_verror(td, ret, "rpma_ep_next_conn_req");
982                 goto err_mr_dereg;
983         }
984
985         if (csd->prepare_connection && csd->prepare_connection(td, conn_req))
986                 goto err_req_delete;
987
988         /* accept the connection request and obtain the connection object */
989         if ((ret = rpma_conn_req_connect(&conn_req, &pdata, &conn))) {
990                 librpma_td_verror(td, ret, "rpma_conn_req_connect");
991                 goto err_req_delete;
992         }
993
994         /* wait for the connection to be established */
995         if ((ret = rpma_conn_next_event(conn, &conn_event))) {
996                 librpma_td_verror(td, ret, "rpma_conn_next_event");
997                 goto err_conn_delete;
998         } else if (conn_event != RPMA_CONN_ESTABLISHED) {
999                 log_err("rpma_conn_next_event returned an unexptected event\n");
1000                 goto err_conn_delete;
1001         }
1002
1003         /* end-point is no longer needed */
1004         (void) rpma_ep_shutdown(&ep);
1005
1006         csd->ws_mr = mr;
1007         csd->ws_ptr = ws_ptr;
1008         csd->conn = conn;
1009
1010         return 0;
1011
1012 err_conn_delete:
1013         (void) rpma_conn_delete(&conn);
1014
1015 err_req_delete:
1016         (void) rpma_conn_req_delete(&conn_req);
1017
1018 err_mr_dereg:
1019         (void) rpma_mr_dereg(&mr);
1020
1021 err_free:
1022         librpma_fio_free(&csd->mem);
1023
1024 err_ep_shutdown:
1025         (void) rpma_ep_shutdown(&ep);
1026
1027         return -1;
1028 }
1029
1030 int librpma_fio_server_close_file(struct thread_data *td, struct fio_file *f)
1031 {
1032         struct librpma_fio_server_data *csd = td->io_ops_data;
1033         enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
1034         int rv = 0;
1035         int ret;
1036
1037         /* wait for the connection to be closed */
1038         ret = rpma_conn_next_event(csd->conn, &conn_event);
1039         if (!ret && conn_event != RPMA_CONN_CLOSED) {
1040                 log_err("rpma_conn_next_event returned an unexptected event\n");
1041                 rv = -1;
1042         }
1043
1044         if ((ret = rpma_conn_disconnect(csd->conn))) {
1045                 librpma_td_verror(td, ret, "rpma_conn_disconnect");
1046                 rv = -1;
1047         }
1048
1049         if ((ret = rpma_conn_delete(&csd->conn))) {
1050                 librpma_td_verror(td, ret, "rpma_conn_delete");
1051                 rv = -1;
1052         }
1053
1054         if ((ret = rpma_mr_dereg(&csd->ws_mr))) {
1055                 librpma_td_verror(td, ret, "rpma_mr_dereg");
1056                 rv = -1;
1057         }
1058
1059         librpma_fio_free(&csd->mem);
1060
1061         return rv;
1062 }