init: include 5 in --terse-version help
[fio.git] / engines / librpma_fio.c
1 /*
2  * librpma_fio: librpma_apm and librpma_gpspm engines' common part.
3  *
4  * Copyright 2021, Intel Corporation
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License,
8  * version 2 as published by the Free Software Foundation..
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  */
15
16 #include "librpma_fio.h"
17
18 #include <libpmem.h>
19
20 struct fio_option librpma_fio_options[] = {
21         {
22                 .name   = "serverip",
23                 .lname  = "rpma_server_ip",
24                 .type   = FIO_OPT_STR_STORE,
25                 .off1   = offsetof(struct librpma_fio_options_values, server_ip),
26                 .help   = "IP address the server is listening on",
27                 .def    = "",
28                 .category = FIO_OPT_C_ENGINE,
29                 .group  = FIO_OPT_G_LIBRPMA,
30         },
31         {
32                 .name   = "port",
33                 .lname  = "rpma_server port",
34                 .type   = FIO_OPT_STR_STORE,
35                 .off1   = offsetof(struct librpma_fio_options_values, port),
36                 .help   = "port the server is listening on",
37                 .def    = "7204",
38                 .category = FIO_OPT_C_ENGINE,
39                 .group  = FIO_OPT_G_LIBRPMA,
40         },
41         {
42                 .name   = "direct_write_to_pmem",
43                 .lname  = "Direct Write to PMem (via RDMA) from the remote host is possible",
44                 .type   = FIO_OPT_BOOL,
45                 .off1   = offsetof(struct librpma_fio_options_values,
46                                         direct_write_to_pmem),
47                 .help   = "Set to true ONLY when Direct Write to PMem from the remote host is possible (https://pmem.io/rpma/documentation/basic-direct-write-to-pmem.html)",
48                 .def    = "",
49                 .category = FIO_OPT_C_ENGINE,
50                 .group  = FIO_OPT_G_LIBRPMA,
51         },
52         {
53                 .name   = "busy_wait_polling",
54                 .lname  = "Set to 0 to wait for completion instead of busy-wait polling completion.",
55                 .type   = FIO_OPT_BOOL,
56                 .off1   = offsetof(struct librpma_fio_options_values,
57                                         busy_wait_polling),
58                 .help   = "Set to false if you want to reduce CPU usage",
59                 .def    = "1",
60                 .category = FIO_OPT_C_ENGINE,
61                 .group  = FIO_OPT_G_LIBRPMA,
62         },
63         {
64                 .name   = NULL,
65         },
66 };
67
68 int librpma_fio_td_port(const char *port_base_str, struct thread_data *td,
69                 char *port_out)
70 {
71         unsigned long int port_ul = strtoul(port_base_str, NULL, 10);
72         unsigned int port_new;
73
74         port_out[0] = '\0';
75
76         if (port_ul == ULONG_MAX) {
77                 td_verror(td, errno, "strtoul");
78                 return -1;
79         }
80         port_ul += td->thread_number - 1;
81         if (port_ul >= UINT_MAX) {
82                 log_err("[%u] port number (%lu) bigger than UINT_MAX\n",
83                         td->thread_number, port_ul);
84                 return -1;
85         }
86
87         port_new = port_ul;
88         snprintf(port_out, LIBRPMA_FIO_PORT_STR_LEN_MAX - 1, "%u", port_new);
89
90         return 0;
91 }
92
93 char *librpma_fio_allocate_dram(struct thread_data *td, size_t size,
94         struct librpma_fio_mem *mem)
95 {
96         char *mem_ptr = NULL;
97         int ret;
98
99         if ((ret = posix_memalign((void **)&mem_ptr, page_size, size))) {
100                 log_err("fio: posix_memalign() failed\n");
101                 td_verror(td, ret, "posix_memalign");
102                 return NULL;
103         }
104
105         mem->mem_ptr = mem_ptr;
106         mem->size_mmap = 0;
107
108         return mem_ptr;
109 }
110
111 char *librpma_fio_allocate_pmem(struct thread_data *td, struct fio_file *f,
112                 size_t size, struct librpma_fio_mem *mem)
113 {
114         size_t size_mmap = 0;
115         char *mem_ptr = NULL;
116         int is_pmem = 0;
117         size_t ws_offset;
118
119         if (size % page_size) {
120                 log_err("fio: size (%zu) is not aligned to page size (%zu)\n",
121                         size, page_size);
122                 return NULL;
123         }
124
125         if (f->filetype == FIO_TYPE_CHAR) {
126                 /* Each thread uses a separate offset within DeviceDAX. */
127                 ws_offset = (td->thread_number - 1) * size;
128         } else {
129                 /* Each thread uses a separate FileSystemDAX file. No offset is needed. */
130                 ws_offset = 0;
131         }
132
133         if (!f->file_name) {
134                 log_err("fio: filename is not set\n");
135                 return NULL;
136         }
137
138         /* map the file */
139         mem_ptr = pmem_map_file(f->file_name, 0 /* len */, 0 /* flags */,
140                         0 /* mode */, &size_mmap, &is_pmem);
141         if (mem_ptr == NULL) {
142                 log_err("fio: pmem_map_file(%s) failed\n", f->file_name);
143                 /* pmem_map_file() sets errno on failure */
144                 td_verror(td, errno, "pmem_map_file");
145                 return NULL;
146         }
147
148         /* pmem is expected */
149         if (!is_pmem) {
150                 log_err("fio: %s is not located in persistent memory\n",
151                         f->file_name);
152                 goto err_unmap;
153         }
154
155         /* check size of allocated persistent memory */
156         if (size_mmap < ws_offset + size) {
157                 log_err(
158                         "fio: %s is too small to handle so many threads (%zu < %zu)\n",
159                         f->file_name, size_mmap, ws_offset + size);
160                 goto err_unmap;
161         }
162
163         log_info("fio: size of memory mapped from the file %s: %zu\n",
164                 f->file_name, size_mmap);
165
166         mem->mem_ptr = mem_ptr;
167         mem->size_mmap = size_mmap;
168
169         return mem_ptr + ws_offset;
170
171 err_unmap:
172         (void) pmem_unmap(mem_ptr, size_mmap);
173         return NULL;
174 }
175
176 void librpma_fio_free(struct librpma_fio_mem *mem)
177 {
178         if (mem->size_mmap)
179                 (void) pmem_unmap(mem->mem_ptr, mem->size_mmap);
180         else
181                 free(mem->mem_ptr);
182 }
183
184 #define LIBRPMA_FIO_RETRY_MAX_NO        10
185 #define LIBRPMA_FIO_RETRY_DELAY_S       5
186
187 int librpma_fio_client_init(struct thread_data *td,
188                 struct rpma_conn_cfg *cfg)
189 {
190         struct librpma_fio_client_data *ccd;
191         struct librpma_fio_options_values *o = td->eo;
192         struct ibv_context *dev = NULL;
193         char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
194         struct rpma_conn_req *req = NULL;
195         enum rpma_conn_event event;
196         struct rpma_conn_private_data pdata;
197         enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
198         int remote_flush_type;
199         int retry;
200         int ret;
201
202         /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
203 #ifdef FIO_INC_DEBUG
204         if ((1UL << FD_NET) & fio_debug)
205                 log_level_aux = RPMA_LOG_LEVEL_INFO;
206 #endif
207
208         /* configure logging thresholds to see more details */
209         rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
210         rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
211
212         /* obtain an IBV context for a remote IP address */
213         if ((ret = rpma_utils_get_ibv_context(o->server_ip,
214                         RPMA_UTIL_IBV_CONTEXT_REMOTE, &dev))) {
215                 librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
216                 return -1;
217         }
218
219         /* allocate client's data */
220         ccd = calloc(1, sizeof(*ccd));
221         if (ccd == NULL) {
222                 td_verror(td, errno, "calloc");
223                 return -1;
224         }
225
226         /* allocate all in-memory queues */
227         ccd->io_us_queued = calloc(td->o.iodepth, sizeof(*ccd->io_us_queued));
228         if (ccd->io_us_queued == NULL) {
229                 td_verror(td, errno, "calloc");
230                 goto err_free_ccd;
231         }
232
233         ccd->io_us_flight = calloc(td->o.iodepth, sizeof(*ccd->io_us_flight));
234         if (ccd->io_us_flight == NULL) {
235                 td_verror(td, errno, "calloc");
236                 goto err_free_io_u_queues;
237         }
238
239         ccd->io_us_completed = calloc(td->o.iodepth,
240                         sizeof(*ccd->io_us_completed));
241         if (ccd->io_us_completed == NULL) {
242                 td_verror(td, errno, "calloc");
243                 goto err_free_io_u_queues;
244         }
245
246         /* create a new peer object */
247         if ((ret = rpma_peer_new(dev, &ccd->peer))) {
248                 librpma_td_verror(td, ret, "rpma_peer_new");
249                 goto err_free_io_u_queues;
250         }
251
252         /* create a connection request */
253         if (librpma_fio_td_port(o->port, td, port_td))
254                 goto err_peer_delete;
255
256         for (retry = 0; retry < LIBRPMA_FIO_RETRY_MAX_NO; retry++) {
257                 if ((ret = rpma_conn_req_new(ccd->peer, o->server_ip, port_td,
258                                 cfg, &req))) {
259                         librpma_td_verror(td, ret, "rpma_conn_req_new");
260                         goto err_peer_delete;
261                 }
262
263                 /*
264                  * Connect the connection request
265                  * and obtain the connection object.
266                  */
267                 if ((ret = rpma_conn_req_connect(&req, NULL, &ccd->conn))) {
268                         librpma_td_verror(td, ret, "rpma_conn_req_connect");
269                         goto err_req_delete;
270                 }
271
272                 /* wait for the connection to establish */
273                 if ((ret = rpma_conn_next_event(ccd->conn, &event))) {
274                         librpma_td_verror(td, ret, "rpma_conn_next_event");
275                         goto err_conn_delete;
276                 } else if (event == RPMA_CONN_ESTABLISHED) {
277                         break;
278                 } else if (event == RPMA_CONN_REJECTED) {
279                         (void) rpma_conn_disconnect(ccd->conn);
280                         (void) rpma_conn_delete(&ccd->conn);
281                         if (retry < LIBRPMA_FIO_RETRY_MAX_NO - 1) {
282                                 log_err("Thread [%d]: Retrying (#%i) ...\n",
283                                         td->thread_number, retry + 1);
284                                 sleep(LIBRPMA_FIO_RETRY_DELAY_S);
285                         } else {
286                                 log_err(
287                                         "Thread [%d]: The maximum number of retries exceeded. Closing.\n",
288                                         td->thread_number);
289                         }
290                 } else {
291                         log_err(
292                                 "rpma_conn_next_event returned an unexptected event: (%s != RPMA_CONN_ESTABLISHED)\n",
293                                 rpma_utils_conn_event_2str(event));
294                         goto err_conn_delete;
295                 }
296         }
297
298         if (retry > 0)
299                 log_err("Thread [%d]: Connected after retry #%i\n",
300                         td->thread_number, retry);
301
302         if (ccd->conn == NULL)
303                 goto err_peer_delete;
304
305         /* get the connection's main CQ */
306         if ((ret = rpma_conn_get_cq(ccd->conn, &ccd->cq))) {
307                 librpma_td_verror(td, ret, "rpma_conn_get_cq");
308                 goto err_conn_delete;
309         }
310
311         /* get the connection's private data sent from the server */
312         if ((ret = rpma_conn_get_private_data(ccd->conn, &pdata))) {
313                 librpma_td_verror(td, ret, "rpma_conn_get_private_data");
314                 goto err_conn_delete;
315         }
316
317         /* get the server's workspace representation */
318         ccd->ws = pdata.ptr;
319
320         /* create the server's memory representation */
321         if ((ret = rpma_mr_remote_from_descriptor(&ccd->ws->descriptor[0],
322                         ccd->ws->mr_desc_size, &ccd->server_mr))) {
323                 librpma_td_verror(td, ret, "rpma_mr_remote_from_descriptor");
324                 goto err_conn_delete;
325         }
326
327         /* get the total size of the shared server memory */
328         if ((ret = rpma_mr_remote_get_size(ccd->server_mr, &ccd->ws_size))) {
329                 librpma_td_verror(td, ret, "rpma_mr_remote_get_size");
330                 goto err_conn_delete;
331         }
332
333         /* get flush type of the remote node */
334         if ((ret = rpma_mr_remote_get_flush_type(ccd->server_mr,
335                         &remote_flush_type))) {
336                 librpma_td_verror(td, ret, "rpma_mr_remote_get_flush_type");
337                 goto err_conn_delete;
338         }
339
340         ccd->server_mr_flush_type =
341                 (remote_flush_type & RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT) ?
342                 RPMA_FLUSH_TYPE_PERSISTENT : RPMA_FLUSH_TYPE_VISIBILITY;
343
344         /*
345          * Assure an io_us buffer allocation is page-size-aligned which is required
346          * to register for RDMA. User-provided value is intentionally ignored.
347          */
348         td->o.mem_align = page_size;
349
350         td->io_ops_data = ccd;
351
352         return 0;
353
354 err_conn_delete:
355         (void) rpma_conn_disconnect(ccd->conn);
356         (void) rpma_conn_delete(&ccd->conn);
357
358 err_req_delete:
359         (void) rpma_conn_req_delete(&req);
360
361 err_peer_delete:
362         (void) rpma_peer_delete(&ccd->peer);
363
364 err_free_io_u_queues:
365         free(ccd->io_us_queued);
366         free(ccd->io_us_flight);
367         free(ccd->io_us_completed);
368
369 err_free_ccd:
370         free(ccd);
371
372         return -1;
373 }
374
375 void librpma_fio_client_cleanup(struct thread_data *td)
376 {
377         struct librpma_fio_client_data *ccd = td->io_ops_data;
378         enum rpma_conn_event ev;
379         int ret;
380
381         if (ccd == NULL)
382                 return;
383
384         /* delete the iou's memory registration */
385         if ((ret = rpma_mr_dereg(&ccd->orig_mr)))
386                 librpma_td_verror(td, ret, "rpma_mr_dereg");
387         /* delete the iou's memory registration */
388         if ((ret = rpma_mr_remote_delete(&ccd->server_mr)))
389                 librpma_td_verror(td, ret, "rpma_mr_remote_delete");
390         /* initiate disconnection */
391         if ((ret = rpma_conn_disconnect(ccd->conn)))
392                 librpma_td_verror(td, ret, "rpma_conn_disconnect");
393         /* wait for disconnection to end up */
394         if ((ret = rpma_conn_next_event(ccd->conn, &ev))) {
395                 librpma_td_verror(td, ret, "rpma_conn_next_event");
396         } else if (ev != RPMA_CONN_CLOSED) {
397                 log_err(
398                         "client_cleanup received an unexpected event (%s != RPMA_CONN_CLOSED)\n",
399                         rpma_utils_conn_event_2str(ev));
400         }
401         /* delete the connection */
402         if ((ret = rpma_conn_delete(&ccd->conn)))
403                 librpma_td_verror(td, ret, "rpma_conn_delete");
404         /* delete the peer */
405         if ((ret = rpma_peer_delete(&ccd->peer)))
406                 librpma_td_verror(td, ret, "rpma_peer_delete");
407         /* free the software queues */
408         free(ccd->io_us_queued);
409         free(ccd->io_us_flight);
410         free(ccd->io_us_completed);
411         free(ccd);
412         td->io_ops_data = NULL; /* zero ccd */
413 }
414
415 int librpma_fio_file_nop(struct thread_data *td, struct fio_file *f)
416 {
417         /* NOP */
418         return 0;
419 }
420
421 int librpma_fio_client_post_init(struct thread_data *td)
422 {
423         struct librpma_fio_client_data *ccd =  td->io_ops_data;
424         size_t io_us_size;
425         int ret;
426
427         /*
428          * td->orig_buffer is not aligned. The engine requires aligned io_us
429          * so FIO aligns up the address using the formula below.
430          */
431         ccd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
432                         td->o.mem_align;
433
434         /*
435          * td->orig_buffer_size beside the space really consumed by io_us
436          * has paddings which can be omitted for the memory registration.
437          */
438         io_us_size = (unsigned long long)td_max_bs(td) *
439                         (unsigned long long)td->o.iodepth;
440
441         if ((ret = rpma_mr_reg(ccd->peer, ccd->orig_buffer_aligned, io_us_size,
442                         RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
443                         RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
444                         RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT, &ccd->orig_mr)))
445                 librpma_td_verror(td, ret, "rpma_mr_reg");
446         return ret;
447 }
448
449 int librpma_fio_client_get_file_size(struct thread_data *td,
450                 struct fio_file *f)
451 {
452         struct librpma_fio_client_data *ccd = td->io_ops_data;
453
454         f->real_file_size = ccd->ws_size;
455         fio_file_set_size_known(f);
456
457         return 0;
458 }
459
460 static enum fio_q_status client_queue_sync(struct thread_data *td,
461                 struct io_u *io_u)
462 {
463         struct librpma_fio_client_data *ccd = td->io_ops_data;
464         struct ibv_wc wc;
465         unsigned io_u_index;
466         int ret;
467
468         /* execute io_u */
469         if (io_u->ddir == DDIR_READ) {
470                 /* post an RDMA read operation */
471                 if (librpma_fio_client_io_read(td, io_u,
472                                 RPMA_F_COMPLETION_ALWAYS))
473                         goto err;
474         } else if (io_u->ddir == DDIR_WRITE) {
475                 /* post an RDMA write operation */
476                 if (librpma_fio_client_io_write(td, io_u))
477                         goto err;
478                 if (ccd->flush(td, io_u, io_u, io_u->xfer_buflen))
479                         goto err;
480         } else {
481                 log_err("unsupported IO mode: %s\n", io_ddir_name(io_u->ddir));
482                 goto err;
483         }
484
485         do {
486                 /* get a completion */
487                 ret = rpma_cq_get_wc(ccd->cq, 1, &wc, NULL);
488                 if (ret == RPMA_E_NO_COMPLETION) {
489                         /* lack of completion is not an error */
490                         continue;
491                 } else if (ret != 0) {
492                         /* an error occurred */
493                         librpma_td_verror(td, ret, "rpma_cq_get_wc");
494                         goto err;
495                 }
496
497                 /* if io_us has completed with an error */
498                 if (wc.status != IBV_WC_SUCCESS)
499                         goto err;
500
501                 if (wc.opcode == IBV_WC_SEND)
502                         ++ccd->op_send_completed;
503                 else {
504                         if (wc.opcode == IBV_WC_RECV)
505                                 ++ccd->op_recv_completed;
506
507                         break;
508                 }
509         } while (1);
510
511         if (ccd->get_io_u_index(&wc, &io_u_index) != 1)
512                 goto err;
513
514         if (io_u->index != io_u_index) {
515                 log_err(
516                         "no matching io_u for received completion found (io_u_index=%u)\n",
517                         io_u_index);
518                 goto err;
519         }
520
521         /* make sure all SENDs are completed before exit - clean up SQ */
522         if (librpma_fio_client_io_complete_all_sends(td))
523                 goto err;
524
525         return FIO_Q_COMPLETED;
526
527 err:
528         io_u->error = -1;
529         return FIO_Q_COMPLETED;
530 }
531
532 enum fio_q_status librpma_fio_client_queue(struct thread_data *td,
533                 struct io_u *io_u)
534 {
535         struct librpma_fio_client_data *ccd = td->io_ops_data;
536
537         if (ccd->io_u_queued_nr == (int)td->o.iodepth)
538                 return FIO_Q_BUSY;
539
540         if (td->o.sync_io)
541                 return client_queue_sync(td, io_u);
542
543         /* io_u -> queued[] */
544         ccd->io_us_queued[ccd->io_u_queued_nr] = io_u;
545         ccd->io_u_queued_nr++;
546
547         return FIO_Q_QUEUED;
548 }
549
550 int librpma_fio_client_commit(struct thread_data *td)
551 {
552         struct librpma_fio_client_data *ccd = td->io_ops_data;
553         int flags = RPMA_F_COMPLETION_ON_ERROR;
554         struct timespec now;
555         bool fill_time;
556         int i;
557         struct io_u *flush_first_io_u = NULL;
558         unsigned long long int flush_len = 0;
559
560         if (!ccd->io_us_queued)
561                 return -1;
562
563         /* execute all io_us from queued[] */
564         for (i = 0; i < ccd->io_u_queued_nr; i++) {
565                 struct io_u *io_u = ccd->io_us_queued[i];
566
567                 if (io_u->ddir == DDIR_READ) {
568                         if (i + 1 == ccd->io_u_queued_nr ||
569                             ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE)
570                                 flags = RPMA_F_COMPLETION_ALWAYS;
571                         /* post an RDMA read operation */
572                         if (librpma_fio_client_io_read(td, io_u, flags))
573                                 return -1;
574                 } else if (io_u->ddir == DDIR_WRITE) {
575                         /* post an RDMA write operation */
576                         if (librpma_fio_client_io_write(td, io_u))
577                                 return -1;
578
579                         /* cache the first io_u in the sequence */
580                         if (flush_first_io_u == NULL)
581                                 flush_first_io_u = io_u;
582
583                         /*
584                          * the flush length is the sum of all io_u's creating
585                          * the sequence
586                          */
587                         flush_len += io_u->xfer_buflen;
588
589                         /*
590                          * if io_u's are random the rpma_flush is required
591                          * after each one of them
592                          */
593                         if (!td_random(td)) {
594                                 /*
595                                  * When the io_u's are sequential and
596                                  * the current io_u is not the last one and
597                                  * the next one is also a write operation
598                                  * the flush can be postponed by one io_u and
599                                  * cover all of them which build a continuous
600                                  * sequence.
601                                  */
602                                 if ((i + 1 < ccd->io_u_queued_nr) &&
603                                     (ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE))
604                                         continue;
605                         }
606
607                         /* flush all writes which build a continuous sequence */
608                         if (ccd->flush(td, flush_first_io_u, io_u, flush_len))
609                                 return -1;
610
611                         /*
612                          * reset the flush parameters in preparation for
613                          * the next one
614                          */
615                         flush_first_io_u = NULL;
616                         flush_len = 0;
617                 } else {
618                         log_err("unsupported IO mode: %s\n",
619                                 io_ddir_name(io_u->ddir));
620                         return -1;
621                 }
622         }
623
624         if ((fill_time = fio_fill_issue_time(td))) {
625                 fio_gettime(&now, NULL);
626
627                 /*
628                  * only used for iolog
629                  */
630                 if (td->o.read_iolog_file)
631                         memcpy(&td->last_issue, &now, sizeof(now));
632
633         }
634         /* move executed io_us from queued[] to flight[] */
635         for (i = 0; i < ccd->io_u_queued_nr; i++) {
636                 struct io_u *io_u = ccd->io_us_queued[i];
637
638                 /* FIO does not do this if the engine is asynchronous */
639                 if (fill_time)
640                         memcpy(&io_u->issue_time, &now, sizeof(now));
641
642                 /* move executed io_us from queued[] to flight[] */
643                 ccd->io_us_flight[ccd->io_u_flight_nr] = io_u;
644                 ccd->io_u_flight_nr++;
645
646                 /*
647                  * FIO says:
648                  * If an engine has the commit hook
649                  * it has to call io_u_queued() itself.
650                  */
651                 io_u_queued(td, io_u);
652         }
653
654         /* FIO does not do this if an engine has the commit hook. */
655         io_u_mark_submit(td, ccd->io_u_queued_nr);
656         ccd->io_u_queued_nr = 0;
657
658         return 0;
659 }
660
661 /*
662  * RETURN VALUE
663  * - > 0  - a number of completed io_us
664  * -   0  - when no complicitions received
665  * - (-1) - when an error occurred
666  */
667 static int client_getevent_process(struct thread_data *td)
668 {
669         struct librpma_fio_client_data *ccd = td->io_ops_data;
670         struct ibv_wc wc;
671         /* io_u->index of completed io_u (wc.wr_id) */
672         unsigned int io_u_index;
673         /* # of completed io_us */
674         int cmpl_num = 0;
675         /* helpers */
676         struct io_u *io_u;
677         int i;
678         int ret;
679
680         /* get a completion */
681         if ((ret = rpma_cq_get_wc(ccd->cq, 1, &wc, NULL))) {
682                 /* lack of completion is not an error */
683                 if (ret == RPMA_E_NO_COMPLETION) {
684                         /* lack of completion is not an error */
685                         return 0;
686                 }
687
688                 /* an error occurred */
689                 librpma_td_verror(td, ret, "rpma_cq_get_wc");
690                 return -1;
691         }
692
693         /* if io_us has completed with an error */
694         if (wc.status != IBV_WC_SUCCESS) {
695                 td->error = wc.status;
696                 return -1;
697         }
698
699         if (wc.opcode == IBV_WC_SEND)
700                 ++ccd->op_send_completed;
701         else if (wc.opcode == IBV_WC_RECV)
702                 ++ccd->op_recv_completed;
703
704         if ((ret = ccd->get_io_u_index(&wc, &io_u_index)) != 1)
705                 return ret;
706
707         /* look for an io_u being completed */
708         for (i = 0; i < ccd->io_u_flight_nr; ++i) {
709                 if (ccd->io_us_flight[i]->index == io_u_index) {
710                         cmpl_num = i + 1;
711                         break;
712                 }
713         }
714
715         /* if no matching io_u has been found */
716         if (cmpl_num == 0) {
717                 log_err(
718                         "no matching io_u for received completion found (io_u_index=%u)\n",
719                         io_u_index);
720                 return -1;
721         }
722
723         /* move completed io_us to the completed in-memory queue */
724         for (i = 0; i < cmpl_num; ++i) {
725                 /* get and prepare io_u */
726                 io_u = ccd->io_us_flight[i];
727
728                 /* append to the queue */
729                 ccd->io_us_completed[ccd->io_u_completed_nr] = io_u;
730                 ccd->io_u_completed_nr++;
731         }
732
733         /* remove completed io_us from the flight queue */
734         for (i = cmpl_num; i < ccd->io_u_flight_nr; ++i)
735                 ccd->io_us_flight[i - cmpl_num] = ccd->io_us_flight[i];
736         ccd->io_u_flight_nr -= cmpl_num;
737
738         return cmpl_num;
739 }
740
741 int librpma_fio_client_getevents(struct thread_data *td, unsigned int min,
742                 unsigned int max, const struct timespec *t)
743 {
744         struct librpma_fio_client_data *ccd = td->io_ops_data;
745         /* total # of completed io_us */
746         int cmpl_num_total = 0;
747         /* # of completed io_us from a single event */
748         int cmpl_num;
749
750         do {
751                 cmpl_num = client_getevent_process(td);
752                 if (cmpl_num > 0) {
753                         /* new completions collected */
754                         cmpl_num_total += cmpl_num;
755                 } else if (cmpl_num == 0) {
756                         /*
757                          * It is required to make sure that CQEs for SENDs
758                          * will flow at least at the same pace as CQEs for RECVs.
759                          */
760                         if (cmpl_num_total >= min &&
761                             ccd->op_send_completed >= ccd->op_recv_completed)
762                                 break;
763
764                         /*
765                          * To reduce CPU consumption one can use
766                          * the rpma_cq_wait() function.
767                          * Note this greatly increase the latency
768                          * and make the results less stable.
769                          * The bandwidth stays more or less the same.
770                          */
771                 } else {
772                         /* an error occurred */
773                         return -1;
774                 }
775
776                 /*
777                  * The expected max can be exceeded if CQEs for RECVs will come up
778                  * faster than CQEs for SENDs. But it is required to make sure CQEs for
779                  * SENDs will flow at least at the same pace as CQEs for RECVs.
780                  */
781         } while (cmpl_num_total < max ||
782                         ccd->op_send_completed < ccd->op_recv_completed);
783
784         /*
785          * All posted SENDs are completed and RECVs for them (responses) are
786          * completed. This is the initial situation so the counters are reset.
787          */
788         if (ccd->op_send_posted == ccd->op_send_completed &&
789                         ccd->op_send_completed == ccd->op_recv_completed) {
790                 ccd->op_send_posted = 0;
791                 ccd->op_send_completed = 0;
792                 ccd->op_recv_completed = 0;
793         }
794
795         return cmpl_num_total;
796 }
797
798 struct io_u *librpma_fio_client_event(struct thread_data *td, int event)
799 {
800         struct librpma_fio_client_data *ccd = td->io_ops_data;
801         struct io_u *io_u;
802         int i;
803
804         /* get the first io_u from the queue */
805         io_u = ccd->io_us_completed[0];
806
807         /* remove the first io_u from the queue */
808         for (i = 1; i < ccd->io_u_completed_nr; ++i)
809                 ccd->io_us_completed[i - 1] = ccd->io_us_completed[i];
810         ccd->io_u_completed_nr--;
811
812         dprint_io_u(io_u, "client_event");
813
814         return io_u;
815 }
816
817 char *librpma_fio_client_errdetails(struct io_u *io_u)
818 {
819         /* get the string representation of an error */
820         enum ibv_wc_status status = io_u->error;
821         const char *status_str = ibv_wc_status_str(status);
822
823         char *details = strdup(status_str);
824         if (details == NULL) {
825                 fprintf(stderr, "Error: %s\n", status_str);
826                 fprintf(stderr, "Fatal error: out of memory. Aborting.\n");
827                 abort();
828         }
829
830         /* FIO frees the returned string when it becomes obsolete */
831         return details;
832 }
833
834 int librpma_fio_server_init(struct thread_data *td)
835 {
836         struct librpma_fio_options_values *o = td->eo;
837         struct librpma_fio_server_data *csd;
838         struct ibv_context *dev = NULL;
839         enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
840         int ret = -1;
841
842         /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
843 #ifdef FIO_INC_DEBUG
844         if ((1UL << FD_NET) & fio_debug)
845                 log_level_aux = RPMA_LOG_LEVEL_INFO;
846 #endif
847
848         /* configure logging thresholds to see more details */
849         rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
850         rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
851
852
853         /* obtain an IBV context for a remote IP address */
854         if ((ret = rpma_utils_get_ibv_context(o->server_ip,
855                         RPMA_UTIL_IBV_CONTEXT_LOCAL, &dev))) {
856                 librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
857                 return -1;
858         }
859
860         /* allocate server's data */
861         csd = calloc(1, sizeof(*csd));
862         if (csd == NULL) {
863                 td_verror(td, errno, "calloc");
864                 return -1;
865         }
866
867         /* create a new peer object */
868         if ((ret = rpma_peer_new(dev, &csd->peer))) {
869                 librpma_td_verror(td, ret, "rpma_peer_new");
870                 goto err_free_csd;
871         }
872
873         td->io_ops_data = csd;
874
875         return 0;
876
877 err_free_csd:
878         free(csd);
879
880         return -1;
881 }
882
883 void librpma_fio_server_cleanup(struct thread_data *td)
884 {
885         struct librpma_fio_server_data *csd =  td->io_ops_data;
886         int ret;
887
888         if (csd == NULL)
889                 return;
890
891         /* free the peer */
892         if ((ret = rpma_peer_delete(&csd->peer)))
893                 librpma_td_verror(td, ret, "rpma_peer_delete");
894
895         free(csd);
896 }
897
898 int librpma_fio_server_open_file(struct thread_data *td, struct fio_file *f,
899                 struct rpma_conn_cfg *cfg)
900 {
901         struct librpma_fio_server_data *csd = td->io_ops_data;
902         struct librpma_fio_options_values *o = td->eo;
903         enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
904         struct librpma_fio_workspace ws = {0};
905         struct rpma_conn_private_data pdata;
906         uint32_t max_msg_num;
907         struct rpma_conn_req *conn_req;
908         struct rpma_conn *conn;
909         struct rpma_mr_local *mr;
910         char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
911         struct rpma_ep *ep;
912         size_t mem_size = td->o.size;
913         size_t mr_desc_size;
914         void *ws_ptr;
915         bool is_dram;
916         int usage_mem_type;
917         int ret;
918
919         if (!f->file_name) {
920                 log_err("fio: filename is not set\n");
921                 return -1;
922         }
923
924         /* start a listening endpoint at addr:port */
925         if (librpma_fio_td_port(o->port, td, port_td))
926                 return -1;
927
928         if ((ret = rpma_ep_listen(csd->peer, o->server_ip, port_td, &ep))) {
929                 librpma_td_verror(td, ret, "rpma_ep_listen");
930                 return -1;
931         }
932
933         is_dram = !strcmp(f->file_name, "malloc");
934         if (is_dram) {
935                 /* allocation from DRAM using posix_memalign() */
936                 ws_ptr = librpma_fio_allocate_dram(td, mem_size, &csd->mem);
937                 usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_VISIBILITY;
938         } else {
939                 /* allocation from PMEM using pmem_map_file() */
940                 ws_ptr = librpma_fio_allocate_pmem(td, f, mem_size, &csd->mem);
941                 usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT;
942         }
943
944         if (ws_ptr == NULL)
945                 goto err_ep_shutdown;
946
947         f->real_file_size = mem_size;
948
949         if ((ret = rpma_mr_reg(csd->peer, ws_ptr, mem_size,
950                         RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
951                         RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
952                         usage_mem_type, &mr))) {
953                 librpma_td_verror(td, ret, "rpma_mr_reg");
954                 goto err_free;
955         }
956
957         if (!is_dram && f->filetype == FIO_TYPE_FILE) {
958                 ret = rpma_mr_advise(mr, 0, mem_size,
959                                 IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE,
960                                 IBV_ADVISE_MR_FLAG_FLUSH);
961                 if (ret) {
962                         librpma_td_verror(td, ret, "rpma_mr_advise");
963                         /* an invalid argument is an error */
964                         if (ret == RPMA_E_INVAL)
965                                 goto err_mr_dereg;
966
967                         /* log_err used instead of log_info to avoid corruption of the JSON output */
968                         log_err("Note: having rpma_mr_advise(3) failed because of RPMA_E_NOSUPP or RPMA_E_PROVIDER may come with a performance penalty, but it is not a blocker for running the benchmark.\n");
969                 }
970         }
971
972         /* get size of the memory region's descriptor */
973         if ((ret = rpma_mr_get_descriptor_size(mr, &mr_desc_size))) {
974                 librpma_td_verror(td, ret, "rpma_mr_get_descriptor_size");
975                 goto err_mr_dereg;
976         }
977
978         /* verify size of the memory region's descriptor */
979         if (mr_desc_size > LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE) {
980                 log_err(
981                         "size of the memory region's descriptor is too big (max=%i)\n",
982                         LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE);
983                 goto err_mr_dereg;
984         }
985
986         /* get the memory region's descriptor */
987         if ((ret = rpma_mr_get_descriptor(mr, &ws.descriptor[0]))) {
988                 librpma_td_verror(td, ret, "rpma_mr_get_descriptor");
989                 goto err_mr_dereg;
990         }
991
992         if (cfg != NULL) {
993                 if ((ret = rpma_conn_cfg_get_rq_size(cfg, &max_msg_num))) {
994                         librpma_td_verror(td, ret, "rpma_conn_cfg_get_rq_size");
995                         goto err_mr_dereg;
996                 }
997
998                 /* verify whether iodepth fits into uint16_t */
999                 if (max_msg_num > UINT16_MAX) {
1000                         log_err("fio: iodepth too big (%u > %u)\n",
1001                                 max_msg_num, UINT16_MAX);
1002                         return -1;
1003                 }
1004
1005                 ws.max_msg_num = max_msg_num;
1006         }
1007
1008         /* prepare a workspace description */
1009         ws.direct_write_to_pmem = o->direct_write_to_pmem;
1010         ws.mr_desc_size = mr_desc_size;
1011         pdata.ptr = &ws;
1012         pdata.len = sizeof(ws);
1013
1014         /* receive an incoming connection request */
1015         if ((ret = rpma_ep_next_conn_req(ep, cfg, &conn_req))) {
1016                 librpma_td_verror(td, ret, "rpma_ep_next_conn_req");
1017                 goto err_mr_dereg;
1018         }
1019
1020         if (csd->prepare_connection && csd->prepare_connection(td, conn_req))
1021                 goto err_req_delete;
1022
1023         /* accept the connection request and obtain the connection object */
1024         if ((ret = rpma_conn_req_connect(&conn_req, &pdata, &conn))) {
1025                 librpma_td_verror(td, ret, "rpma_conn_req_connect");
1026                 goto err_req_delete;
1027         }
1028
1029         /* wait for the connection to be established */
1030         if ((ret = rpma_conn_next_event(conn, &conn_event))) {
1031                 librpma_td_verror(td, ret, "rpma_conn_next_event");
1032                 goto err_conn_delete;
1033         } else if (conn_event != RPMA_CONN_ESTABLISHED) {
1034                 log_err("rpma_conn_next_event returned an unexptected event\n");
1035                 goto err_conn_delete;
1036         }
1037
1038         /* end-point is no longer needed */
1039         (void) rpma_ep_shutdown(&ep);
1040
1041         csd->ws_mr = mr;
1042         csd->ws_ptr = ws_ptr;
1043         csd->conn = conn;
1044
1045         /* get the connection's main CQ */
1046         if ((ret = rpma_conn_get_cq(csd->conn, &csd->cq))) {
1047                 librpma_td_verror(td, ret, "rpma_conn_get_cq");
1048                 goto err_conn_delete;
1049         }
1050
1051         return 0;
1052
1053 err_conn_delete:
1054         (void) rpma_conn_delete(&conn);
1055
1056 err_req_delete:
1057         (void) rpma_conn_req_delete(&conn_req);
1058
1059 err_mr_dereg:
1060         (void) rpma_mr_dereg(&mr);
1061
1062 err_free:
1063         librpma_fio_free(&csd->mem);
1064
1065 err_ep_shutdown:
1066         (void) rpma_ep_shutdown(&ep);
1067
1068         return -1;
1069 }
1070
1071 int librpma_fio_server_close_file(struct thread_data *td, struct fio_file *f)
1072 {
1073         struct librpma_fio_server_data *csd = td->io_ops_data;
1074         enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
1075         int rv = 0;
1076         int ret;
1077
1078         /* wait for the connection to be closed */
1079         ret = rpma_conn_next_event(csd->conn, &conn_event);
1080         if (!ret && conn_event != RPMA_CONN_CLOSED) {
1081                 log_err("rpma_conn_next_event returned an unexptected event\n");
1082                 rv = -1;
1083         }
1084
1085         if ((ret = rpma_conn_disconnect(csd->conn))) {
1086                 librpma_td_verror(td, ret, "rpma_conn_disconnect");
1087                 rv = -1;
1088         }
1089
1090         if ((ret = rpma_conn_delete(&csd->conn))) {
1091                 librpma_td_verror(td, ret, "rpma_conn_delete");
1092                 rv = -1;
1093         }
1094
1095         if ((ret = rpma_mr_dereg(&csd->ws_mr))) {
1096                 librpma_td_verror(td, ret, "rpma_mr_dereg");
1097                 rv = -1;
1098         }
1099
1100         librpma_fio_free(&csd->mem);
1101
1102         return rv;
1103 }