Merge branch 'master' of https://github.com/celestinechen/fio
[fio.git] / engines / librpma_fio.c
1 /*
2  * librpma_fio: librpma_apm and librpma_gpspm engines' common part.
3  *
4  * Copyright 2021-2022, Intel Corporation
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License,
8  * version 2 as published by the Free Software Foundation..
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  */
15
16 #ifdef CONFIG_LIBPMEM2_INSTALLED
17 #include "librpma_fio_pmem2.h"
18 #else
19 #include "librpma_fio_pmem.h"
20 #endif /* CONFIG_LIBPMEM2_INSTALLED */
21
22 struct fio_option librpma_fio_options[] = {
23         {
24                 .name   = "serverip",
25                 .lname  = "rpma_server_ip",
26                 .type   = FIO_OPT_STR_STORE,
27                 .off1   = offsetof(struct librpma_fio_options_values, server_ip),
28                 .help   = "IP address the server is listening on",
29                 .def    = "",
30                 .category = FIO_OPT_C_ENGINE,
31                 .group  = FIO_OPT_G_LIBRPMA,
32         },
33         {
34                 .name   = "port",
35                 .lname  = "rpma_server port",
36                 .type   = FIO_OPT_STR_STORE,
37                 .off1   = offsetof(struct librpma_fio_options_values, port),
38                 .help   = "port the server is listening on",
39                 .def    = "7204",
40                 .category = FIO_OPT_C_ENGINE,
41                 .group  = FIO_OPT_G_LIBRPMA,
42         },
43         {
44                 .name   = "direct_write_to_pmem",
45                 .lname  = "Direct Write to PMem (via RDMA) from the remote host is possible",
46                 .type   = FIO_OPT_BOOL,
47                 .off1   = offsetof(struct librpma_fio_options_values,
48                                         direct_write_to_pmem),
49                 .help   = "Set to true ONLY when Direct Write to PMem from the remote host is possible (https://pmem.io/rpma/documentation/basic-direct-write-to-pmem.html)",
50                 .def    = "",
51                 .category = FIO_OPT_C_ENGINE,
52                 .group  = FIO_OPT_G_LIBRPMA,
53         },
54         {
55                 .name   = "busy_wait_polling",
56                 .lname  = "Set to 0 to wait for completion instead of busy-wait polling completion.",
57                 .type   = FIO_OPT_BOOL,
58                 .off1   = offsetof(struct librpma_fio_options_values,
59                                         busy_wait_polling),
60                 .help   = "Set to false if you want to reduce CPU usage",
61                 .def    = "1",
62                 .category = FIO_OPT_C_ENGINE,
63                 .group  = FIO_OPT_G_LIBRPMA,
64         },
65         {
66                 .name   = NULL,
67         },
68 };
69
70 int librpma_fio_td_port(const char *port_base_str, struct thread_data *td,
71                 char *port_out)
72 {
73         unsigned long int port_ul = strtoul(port_base_str, NULL, 10);
74         unsigned int port_new;
75
76         port_out[0] = '\0';
77
78         if (port_ul == ULONG_MAX) {
79                 td_verror(td, errno, "strtoul");
80                 return -1;
81         }
82         port_ul += td->thread_number - 1;
83         if (port_ul >= UINT_MAX) {
84                 log_err("[%u] port number (%lu) bigger than UINT_MAX\n",
85                         td->thread_number, port_ul);
86                 return -1;
87         }
88
89         port_new = port_ul;
90         snprintf(port_out, LIBRPMA_FIO_PORT_STR_LEN_MAX - 1, "%u", port_new);
91
92         return 0;
93 }
94
95 char *librpma_fio_allocate_dram(struct thread_data *td, size_t size,
96         struct librpma_fio_mem *mem)
97 {
98         char *mem_ptr = NULL;
99         int ret;
100
101         if ((ret = posix_memalign((void **)&mem_ptr, page_size, size))) {
102                 log_err("fio: posix_memalign() failed\n");
103                 td_verror(td, ret, "posix_memalign");
104                 return NULL;
105         }
106
107         mem->mem_ptr = mem_ptr;
108         mem->size_mmap = 0;
109
110         return mem_ptr;
111 }
112
113 char *librpma_fio_allocate_pmem(struct thread_data *td, struct fio_file *f,
114                 size_t size, struct librpma_fio_mem *mem)
115 {
116         size_t ws_offset;
117         mem->mem_ptr = NULL;
118
119         if (size % page_size) {
120                 log_err("fio: size (%zu) is not aligned to page size (%zu)\n",
121                         size, page_size);
122                 return NULL;
123         }
124
125         if (f->filetype == FIO_TYPE_CHAR) {
126                 /* Each thread uses a separate offset within DeviceDAX. */
127                 ws_offset = (td->thread_number - 1) * size;
128         } else {
129                 /* Each thread uses a separate FileSystemDAX file. No offset is needed. */
130                 ws_offset = 0;
131         }
132
133         if (!f->file_name) {
134                 log_err("fio: filename is not set\n");
135                 return NULL;
136         }
137
138         if (librpma_fio_pmem_map_file(f, size, mem, ws_offset)) {
139                 log_err("fio: librpma_fio_pmem_map_file(%s) failed\n",
140                         f->file_name);
141                 return NULL;
142         }
143
144         log_info("fio: size of memory mapped from the file %s: %zu\n",
145                 f->file_name, mem->size_mmap);
146
147         log_info("fio: library used to map PMem from file: %s\n", RPMA_PMEM_USED);
148
149         return mem->mem_ptr ? mem->mem_ptr + ws_offset : NULL;
150 }
151
152 void librpma_fio_free(struct librpma_fio_mem *mem)
153 {
154         if (mem->size_mmap)
155                 librpma_fio_unmap(mem);
156         else
157                 free(mem->mem_ptr);
158 }
159
160 #define LIBRPMA_FIO_RETRY_MAX_NO        10
161 #define LIBRPMA_FIO_RETRY_DELAY_S       5
162
163 int librpma_fio_client_init(struct thread_data *td,
164                 struct rpma_conn_cfg *cfg)
165 {
166         struct librpma_fio_client_data *ccd;
167         struct librpma_fio_options_values *o = td->eo;
168         struct ibv_context *dev = NULL;
169         char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
170         struct rpma_conn_req *req = NULL;
171         enum rpma_conn_event event;
172         struct rpma_conn_private_data pdata;
173         enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
174         int remote_flush_type;
175         int retry;
176         int ret;
177
178         /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
179 #ifdef FIO_INC_DEBUG
180         if ((1UL << FD_NET) & fio_debug)
181                 log_level_aux = RPMA_LOG_LEVEL_INFO;
182 #endif
183
184         /* configure logging thresholds to see more details */
185         rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
186         rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
187
188         /* obtain an IBV context for a remote IP address */
189         if ((ret = rpma_utils_get_ibv_context(o->server_ip,
190                         RPMA_UTIL_IBV_CONTEXT_REMOTE, &dev))) {
191                 librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
192                 return -1;
193         }
194
195         /* allocate client's data */
196         ccd = calloc(1, sizeof(*ccd));
197         if (ccd == NULL) {
198                 td_verror(td, errno, "calloc");
199                 return -1;
200         }
201
202         /* allocate all in-memory queues */
203         ccd->io_us_queued = calloc(td->o.iodepth, sizeof(*ccd->io_us_queued));
204         if (ccd->io_us_queued == NULL) {
205                 td_verror(td, errno, "calloc");
206                 goto err_free_ccd;
207         }
208
209         ccd->io_us_flight = calloc(td->o.iodepth, sizeof(*ccd->io_us_flight));
210         if (ccd->io_us_flight == NULL) {
211                 td_verror(td, errno, "calloc");
212                 goto err_free_io_u_queues;
213         }
214
215         ccd->io_us_completed = calloc(td->o.iodepth,
216                         sizeof(*ccd->io_us_completed));
217         if (ccd->io_us_completed == NULL) {
218                 td_verror(td, errno, "calloc");
219                 goto err_free_io_u_queues;
220         }
221
222         /* create a new peer object */
223         if ((ret = rpma_peer_new(dev, &ccd->peer))) {
224                 librpma_td_verror(td, ret, "rpma_peer_new");
225                 goto err_free_io_u_queues;
226         }
227
228         /* create a connection request */
229         if (librpma_fio_td_port(o->port, td, port_td))
230                 goto err_peer_delete;
231
232         for (retry = 0; retry < LIBRPMA_FIO_RETRY_MAX_NO; retry++) {
233                 if ((ret = rpma_conn_req_new(ccd->peer, o->server_ip, port_td,
234                                 cfg, &req))) {
235                         librpma_td_verror(td, ret, "rpma_conn_req_new");
236                         goto err_peer_delete;
237                 }
238
239                 /*
240                  * Connect the connection request
241                  * and obtain the connection object.
242                  */
243                 if ((ret = rpma_conn_req_connect(&req, NULL, &ccd->conn))) {
244                         librpma_td_verror(td, ret, "rpma_conn_req_connect");
245                         goto err_req_delete;
246                 }
247
248                 /* wait for the connection to establish */
249                 if ((ret = rpma_conn_next_event(ccd->conn, &event))) {
250                         librpma_td_verror(td, ret, "rpma_conn_next_event");
251                         goto err_conn_delete;
252                 } else if (event == RPMA_CONN_ESTABLISHED) {
253                         break;
254                 } else if (event == RPMA_CONN_REJECTED) {
255                         (void) rpma_conn_disconnect(ccd->conn);
256                         (void) rpma_conn_delete(&ccd->conn);
257                         if (retry < LIBRPMA_FIO_RETRY_MAX_NO - 1) {
258                                 log_err("Thread [%d]: Retrying (#%i) ...\n",
259                                         td->thread_number, retry + 1);
260                                 sleep(LIBRPMA_FIO_RETRY_DELAY_S);
261                         } else {
262                                 log_err(
263                                         "Thread [%d]: The maximum number of retries exceeded. Closing.\n",
264                                         td->thread_number);
265                         }
266                 } else {
267                         log_err(
268                                 "rpma_conn_next_event returned an unexptected event: (%s != RPMA_CONN_ESTABLISHED)\n",
269                                 rpma_utils_conn_event_2str(event));
270                         goto err_conn_delete;
271                 }
272         }
273
274         if (retry > 0)
275                 log_err("Thread [%d]: Connected after retry #%i\n",
276                         td->thread_number, retry);
277
278         if (ccd->conn == NULL)
279                 goto err_peer_delete;
280
281         /* get the connection's main CQ */
282         if ((ret = rpma_conn_get_cq(ccd->conn, &ccd->cq))) {
283                 librpma_td_verror(td, ret, "rpma_conn_get_cq");
284                 goto err_conn_delete;
285         }
286
287         /* get the connection's private data sent from the server */
288         if ((ret = rpma_conn_get_private_data(ccd->conn, &pdata))) {
289                 librpma_td_verror(td, ret, "rpma_conn_get_private_data");
290                 goto err_conn_delete;
291         }
292
293         /* get the server's workspace representation */
294         ccd->ws = pdata.ptr;
295
296         /* create the server's memory representation */
297         if ((ret = rpma_mr_remote_from_descriptor(&ccd->ws->descriptor[0],
298                         ccd->ws->mr_desc_size, &ccd->server_mr))) {
299                 librpma_td_verror(td, ret, "rpma_mr_remote_from_descriptor");
300                 goto err_conn_delete;
301         }
302
303         /* get the total size of the shared server memory */
304         if ((ret = rpma_mr_remote_get_size(ccd->server_mr, &ccd->ws_size))) {
305                 librpma_td_verror(td, ret, "rpma_mr_remote_get_size");
306                 goto err_conn_delete;
307         }
308
309         /* get flush type of the remote node */
310         if ((ret = rpma_mr_remote_get_flush_type(ccd->server_mr,
311                         &remote_flush_type))) {
312                 librpma_td_verror(td, ret, "rpma_mr_remote_get_flush_type");
313                 goto err_conn_delete;
314         }
315
316         ccd->server_mr_flush_type =
317                 (remote_flush_type & RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT) ?
318                 RPMA_FLUSH_TYPE_PERSISTENT : RPMA_FLUSH_TYPE_VISIBILITY;
319
320         /*
321          * Assure an io_us buffer allocation is page-size-aligned which is required
322          * to register for RDMA. User-provided value is intentionally ignored.
323          */
324         td->o.mem_align = page_size;
325
326         td->io_ops_data = ccd;
327
328         return 0;
329
330 err_conn_delete:
331         (void) rpma_conn_disconnect(ccd->conn);
332         (void) rpma_conn_delete(&ccd->conn);
333
334 err_req_delete:
335         (void) rpma_conn_req_delete(&req);
336
337 err_peer_delete:
338         (void) rpma_peer_delete(&ccd->peer);
339
340 err_free_io_u_queues:
341         free(ccd->io_us_queued);
342         free(ccd->io_us_flight);
343         free(ccd->io_us_completed);
344
345 err_free_ccd:
346         free(ccd);
347
348         return -1;
349 }
350
351 void librpma_fio_client_cleanup(struct thread_data *td)
352 {
353         struct librpma_fio_client_data *ccd = td->io_ops_data;
354         enum rpma_conn_event ev;
355         int ret;
356
357         if (ccd == NULL)
358                 return;
359
360         /* delete the iou's memory registration */
361         if ((ret = rpma_mr_dereg(&ccd->orig_mr)))
362                 librpma_td_verror(td, ret, "rpma_mr_dereg");
363         /* delete the iou's memory registration */
364         if ((ret = rpma_mr_remote_delete(&ccd->server_mr)))
365                 librpma_td_verror(td, ret, "rpma_mr_remote_delete");
366         /* initiate disconnection */
367         if ((ret = rpma_conn_disconnect(ccd->conn)))
368                 librpma_td_verror(td, ret, "rpma_conn_disconnect");
369         /* wait for disconnection to end up */
370         if ((ret = rpma_conn_next_event(ccd->conn, &ev))) {
371                 librpma_td_verror(td, ret, "rpma_conn_next_event");
372         } else if (ev != RPMA_CONN_CLOSED) {
373                 log_err(
374                         "client_cleanup received an unexpected event (%s != RPMA_CONN_CLOSED)\n",
375                         rpma_utils_conn_event_2str(ev));
376         }
377         /* delete the connection */
378         if ((ret = rpma_conn_delete(&ccd->conn)))
379                 librpma_td_verror(td, ret, "rpma_conn_delete");
380         /* delete the peer */
381         if ((ret = rpma_peer_delete(&ccd->peer)))
382                 librpma_td_verror(td, ret, "rpma_peer_delete");
383         /* free the software queues */
384         free(ccd->io_us_queued);
385         free(ccd->io_us_flight);
386         free(ccd->io_us_completed);
387         free(ccd);
388         td->io_ops_data = NULL; /* zero ccd */
389 }
390
391 int librpma_fio_file_nop(struct thread_data *td, struct fio_file *f)
392 {
393         /* NOP */
394         return 0;
395 }
396
397 int librpma_fio_client_post_init(struct thread_data *td)
398 {
399         struct librpma_fio_client_data *ccd =  td->io_ops_data;
400         size_t io_us_size;
401         int ret;
402
403         /*
404          * td->orig_buffer is not aligned. The engine requires aligned io_us
405          * so FIO aligns up the address using the formula below.
406          */
407         ccd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
408                         td->o.mem_align;
409
410         /*
411          * td->orig_buffer_size beside the space really consumed by io_us
412          * has paddings which can be omitted for the memory registration.
413          */
414         io_us_size = (unsigned long long)td_max_bs(td) *
415                         (unsigned long long)td->o.iodepth;
416
417         if ((ret = rpma_mr_reg(ccd->peer, ccd->orig_buffer_aligned, io_us_size,
418                         RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
419                         RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
420                         RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT, &ccd->orig_mr)))
421                 librpma_td_verror(td, ret, "rpma_mr_reg");
422         return ret;
423 }
424
425 int librpma_fio_client_get_file_size(struct thread_data *td,
426                 struct fio_file *f)
427 {
428         struct librpma_fio_client_data *ccd = td->io_ops_data;
429
430         f->real_file_size = ccd->ws_size;
431         fio_file_set_size_known(f);
432
433         return 0;
434 }
435
436 static enum fio_q_status client_queue_sync(struct thread_data *td,
437                 struct io_u *io_u)
438 {
439         struct librpma_fio_client_data *ccd = td->io_ops_data;
440         struct ibv_wc wc;
441         unsigned io_u_index;
442         int ret;
443
444         /* execute io_u */
445         if (io_u->ddir == DDIR_READ) {
446                 /* post an RDMA read operation */
447                 if (librpma_fio_client_io_read(td, io_u,
448                                 RPMA_F_COMPLETION_ALWAYS))
449                         goto err;
450         } else if (io_u->ddir == DDIR_WRITE) {
451                 /* post an RDMA write operation */
452                 if (librpma_fio_client_io_write(td, io_u))
453                         goto err;
454                 if (ccd->flush(td, io_u, io_u, io_u->xfer_buflen))
455                         goto err;
456         } else {
457                 log_err("unsupported IO mode: %s\n", io_ddir_name(io_u->ddir));
458                 goto err;
459         }
460
461         do {
462                 /* get a completion */
463                 ret = rpma_cq_get_wc(ccd->cq, 1, &wc, NULL);
464                 if (ret == RPMA_E_NO_COMPLETION) {
465                         /* lack of completion is not an error */
466                         continue;
467                 } else if (ret != 0) {
468                         /* an error occurred */
469                         librpma_td_verror(td, ret, "rpma_cq_get_wc");
470                         goto err;
471                 }
472
473                 /* if io_us has completed with an error */
474                 if (wc.status != IBV_WC_SUCCESS)
475                         goto err;
476
477                 if (wc.opcode == IBV_WC_SEND)
478                         ++ccd->op_send_completed;
479                 else {
480                         if (wc.opcode == IBV_WC_RECV)
481                                 ++ccd->op_recv_completed;
482
483                         break;
484                 }
485         } while (1);
486
487         if (ccd->get_io_u_index(&wc, &io_u_index) != 1)
488                 goto err;
489
490         if (io_u->index != io_u_index) {
491                 log_err(
492                         "no matching io_u for received completion found (io_u_index=%u)\n",
493                         io_u_index);
494                 goto err;
495         }
496
497         /* make sure all SENDs are completed before exit - clean up SQ */
498         if (librpma_fio_client_io_complete_all_sends(td))
499                 goto err;
500
501         return FIO_Q_COMPLETED;
502
503 err:
504         io_u->error = -1;
505         return FIO_Q_COMPLETED;
506 }
507
508 enum fio_q_status librpma_fio_client_queue(struct thread_data *td,
509                 struct io_u *io_u)
510 {
511         struct librpma_fio_client_data *ccd = td->io_ops_data;
512
513         if (ccd->io_u_queued_nr == (int)td->o.iodepth)
514                 return FIO_Q_BUSY;
515
516         if (td->o.sync_io)
517                 return client_queue_sync(td, io_u);
518
519         /* io_u -> queued[] */
520         ccd->io_us_queued[ccd->io_u_queued_nr] = io_u;
521         ccd->io_u_queued_nr++;
522
523         return FIO_Q_QUEUED;
524 }
525
526 int librpma_fio_client_commit(struct thread_data *td)
527 {
528         struct librpma_fio_client_data *ccd = td->io_ops_data;
529         int flags = RPMA_F_COMPLETION_ON_ERROR;
530         struct timespec now;
531         bool fill_time;
532         int i;
533         struct io_u *flush_first_io_u = NULL;
534         unsigned long long int flush_len = 0;
535
536         if (!ccd->io_us_queued)
537                 return -1;
538
539         /* execute all io_us from queued[] */
540         for (i = 0; i < ccd->io_u_queued_nr; i++) {
541                 struct io_u *io_u = ccd->io_us_queued[i];
542
543                 if (io_u->ddir == DDIR_READ) {
544                         if (i + 1 == ccd->io_u_queued_nr ||
545                             ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE)
546                                 flags = RPMA_F_COMPLETION_ALWAYS;
547                         /* post an RDMA read operation */
548                         if (librpma_fio_client_io_read(td, io_u, flags))
549                                 return -1;
550                 } else if (io_u->ddir == DDIR_WRITE) {
551                         /* post an RDMA write operation */
552                         if (librpma_fio_client_io_write(td, io_u))
553                                 return -1;
554
555                         /* cache the first io_u in the sequence */
556                         if (flush_first_io_u == NULL)
557                                 flush_first_io_u = io_u;
558
559                         /*
560                          * the flush length is the sum of all io_u's creating
561                          * the sequence
562                          */
563                         flush_len += io_u->xfer_buflen;
564
565                         /*
566                          * if io_u's are random the rpma_flush is required
567                          * after each one of them
568                          */
569                         if (!td_random(td)) {
570                                 /*
571                                  * When the io_u's are sequential and
572                                  * the current io_u is not the last one and
573                                  * the next one is also a write operation
574                                  * the flush can be postponed by one io_u and
575                                  * cover all of them which build a continuous
576                                  * sequence.
577                                  */
578                                 if ((i + 1 < ccd->io_u_queued_nr) &&
579                                     (ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE))
580                                         continue;
581                         }
582
583                         /* flush all writes which build a continuous sequence */
584                         if (ccd->flush(td, flush_first_io_u, io_u, flush_len))
585                                 return -1;
586
587                         /*
588                          * reset the flush parameters in preparation for
589                          * the next one
590                          */
591                         flush_first_io_u = NULL;
592                         flush_len = 0;
593                 } else {
594                         log_err("unsupported IO mode: %s\n",
595                                 io_ddir_name(io_u->ddir));
596                         return -1;
597                 }
598         }
599
600         if ((fill_time = fio_fill_issue_time(td))) {
601                 fio_gettime(&now, NULL);
602
603                 /*
604                  * only used for iolog
605                  */
606                 if (td->o.read_iolog_file)
607                         memcpy(&td->last_issue, &now, sizeof(now));
608
609         }
610         /* move executed io_us from queued[] to flight[] */
611         for (i = 0; i < ccd->io_u_queued_nr; i++) {
612                 struct io_u *io_u = ccd->io_us_queued[i];
613
614                 /* FIO does not do this if the engine is asynchronous */
615                 if (fill_time)
616                         memcpy(&io_u->issue_time, &now, sizeof(now));
617
618                 /* move executed io_us from queued[] to flight[] */
619                 ccd->io_us_flight[ccd->io_u_flight_nr] = io_u;
620                 ccd->io_u_flight_nr++;
621
622                 /*
623                  * FIO says:
624                  * If an engine has the commit hook
625                  * it has to call io_u_queued() itself.
626                  */
627                 io_u_queued(td, io_u);
628         }
629
630         /* FIO does not do this if an engine has the commit hook. */
631         io_u_mark_submit(td, ccd->io_u_queued_nr);
632         ccd->io_u_queued_nr = 0;
633
634         return 0;
635 }
636
637 /*
638  * RETURN VALUE
639  * - > 0  - a number of completed io_us
640  * -   0  - when no complicitions received
641  * - (-1) - when an error occurred
642  */
643 static int client_getevent_process(struct thread_data *td)
644 {
645         struct librpma_fio_client_data *ccd = td->io_ops_data;
646         struct ibv_wc wc;
647         /* io_u->index of completed io_u (wc.wr_id) */
648         unsigned int io_u_index;
649         /* # of completed io_us */
650         int cmpl_num = 0;
651         /* helpers */
652         struct io_u *io_u;
653         int i;
654         int ret;
655
656         /* get a completion */
657         if ((ret = rpma_cq_get_wc(ccd->cq, 1, &wc, NULL))) {
658                 /* lack of completion is not an error */
659                 if (ret == RPMA_E_NO_COMPLETION) {
660                         /* lack of completion is not an error */
661                         return 0;
662                 }
663
664                 /* an error occurred */
665                 librpma_td_verror(td, ret, "rpma_cq_get_wc");
666                 return -1;
667         }
668
669         /* if io_us has completed with an error */
670         if (wc.status != IBV_WC_SUCCESS) {
671                 td->error = wc.status;
672                 return -1;
673         }
674
675         if (wc.opcode == IBV_WC_SEND)
676                 ++ccd->op_send_completed;
677         else if (wc.opcode == IBV_WC_RECV)
678                 ++ccd->op_recv_completed;
679
680         if ((ret = ccd->get_io_u_index(&wc, &io_u_index)) != 1)
681                 return ret;
682
683         /* look for an io_u being completed */
684         for (i = 0; i < ccd->io_u_flight_nr; ++i) {
685                 if (ccd->io_us_flight[i]->index == io_u_index) {
686                         cmpl_num = i + 1;
687                         break;
688                 }
689         }
690
691         /* if no matching io_u has been found */
692         if (cmpl_num == 0) {
693                 log_err(
694                         "no matching io_u for received completion found (io_u_index=%u)\n",
695                         io_u_index);
696                 return -1;
697         }
698
699         /* move completed io_us to the completed in-memory queue */
700         for (i = 0; i < cmpl_num; ++i) {
701                 /* get and prepare io_u */
702                 io_u = ccd->io_us_flight[i];
703
704                 /* append to the queue */
705                 ccd->io_us_completed[ccd->io_u_completed_nr] = io_u;
706                 ccd->io_u_completed_nr++;
707         }
708
709         /* remove completed io_us from the flight queue */
710         for (i = cmpl_num; i < ccd->io_u_flight_nr; ++i)
711                 ccd->io_us_flight[i - cmpl_num] = ccd->io_us_flight[i];
712         ccd->io_u_flight_nr -= cmpl_num;
713
714         return cmpl_num;
715 }
716
717 int librpma_fio_client_getevents(struct thread_data *td, unsigned int min,
718                 unsigned int max, const struct timespec *t)
719 {
720         struct librpma_fio_client_data *ccd = td->io_ops_data;
721         /* total # of completed io_us */
722         int cmpl_num_total = 0;
723         /* # of completed io_us from a single event */
724         int cmpl_num;
725
726         do {
727                 cmpl_num = client_getevent_process(td);
728                 if (cmpl_num > 0) {
729                         /* new completions collected */
730                         cmpl_num_total += cmpl_num;
731                 } else if (cmpl_num == 0) {
732                         /*
733                          * It is required to make sure that CQEs for SENDs
734                          * will flow at least at the same pace as CQEs for RECVs.
735                          */
736                         if (cmpl_num_total >= min &&
737                             ccd->op_send_completed >= ccd->op_recv_completed)
738                                 break;
739
740                         /*
741                          * To reduce CPU consumption one can use
742                          * the rpma_cq_wait() function.
743                          * Note this greatly increase the latency
744                          * and make the results less stable.
745                          * The bandwidth stays more or less the same.
746                          */
747                 } else {
748                         /* an error occurred */
749                         return -1;
750                 }
751
752                 /*
753                  * The expected max can be exceeded if CQEs for RECVs will come up
754                  * faster than CQEs for SENDs. But it is required to make sure CQEs for
755                  * SENDs will flow at least at the same pace as CQEs for RECVs.
756                  */
757         } while (cmpl_num_total < max ||
758                         ccd->op_send_completed < ccd->op_recv_completed);
759
760         /*
761          * All posted SENDs are completed and RECVs for them (responses) are
762          * completed. This is the initial situation so the counters are reset.
763          */
764         if (ccd->op_send_posted == ccd->op_send_completed &&
765                         ccd->op_send_completed == ccd->op_recv_completed) {
766                 ccd->op_send_posted = 0;
767                 ccd->op_send_completed = 0;
768                 ccd->op_recv_completed = 0;
769         }
770
771         return cmpl_num_total;
772 }
773
774 struct io_u *librpma_fio_client_event(struct thread_data *td, int event)
775 {
776         struct librpma_fio_client_data *ccd = td->io_ops_data;
777         struct io_u *io_u;
778         int i;
779
780         /* get the first io_u from the queue */
781         io_u = ccd->io_us_completed[0];
782
783         /* remove the first io_u from the queue */
784         for (i = 1; i < ccd->io_u_completed_nr; ++i)
785                 ccd->io_us_completed[i - 1] = ccd->io_us_completed[i];
786         ccd->io_u_completed_nr--;
787
788         dprint_io_u(io_u, "client_event");
789
790         return io_u;
791 }
792
793 char *librpma_fio_client_errdetails(struct io_u *io_u)
794 {
795         /* get the string representation of an error */
796         enum ibv_wc_status status = io_u->error;
797         const char *status_str = ibv_wc_status_str(status);
798
799         char *details = strdup(status_str);
800         if (details == NULL) {
801                 fprintf(stderr, "Error: %s\n", status_str);
802                 fprintf(stderr, "Fatal error: out of memory. Aborting.\n");
803                 abort();
804         }
805
806         /* FIO frees the returned string when it becomes obsolete */
807         return details;
808 }
809
810 int librpma_fio_server_init(struct thread_data *td)
811 {
812         struct librpma_fio_options_values *o = td->eo;
813         struct librpma_fio_server_data *csd;
814         struct ibv_context *dev = NULL;
815         enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
816         int ret = -1;
817
818         /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
819 #ifdef FIO_INC_DEBUG
820         if ((1UL << FD_NET) & fio_debug)
821                 log_level_aux = RPMA_LOG_LEVEL_INFO;
822 #endif
823
824         /* configure logging thresholds to see more details */
825         rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
826         rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
827
828
829         /* obtain an IBV context for a remote IP address */
830         if ((ret = rpma_utils_get_ibv_context(o->server_ip,
831                         RPMA_UTIL_IBV_CONTEXT_LOCAL, &dev))) {
832                 librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
833                 return -1;
834         }
835
836         /* allocate server's data */
837         csd = calloc(1, sizeof(*csd));
838         if (csd == NULL) {
839                 td_verror(td, errno, "calloc");
840                 return -1;
841         }
842
843         /* create a new peer object */
844         if ((ret = rpma_peer_new(dev, &csd->peer))) {
845                 librpma_td_verror(td, ret, "rpma_peer_new");
846                 goto err_free_csd;
847         }
848
849         td->io_ops_data = csd;
850
851         return 0;
852
853 err_free_csd:
854         free(csd);
855
856         return -1;
857 }
858
859 void librpma_fio_server_cleanup(struct thread_data *td)
860 {
861         struct librpma_fio_server_data *csd =  td->io_ops_data;
862         int ret;
863
864         if (csd == NULL)
865                 return;
866
867         /* free the peer */
868         if ((ret = rpma_peer_delete(&csd->peer)))
869                 librpma_td_verror(td, ret, "rpma_peer_delete");
870
871         free(csd);
872 }
873
874 int librpma_fio_server_open_file(struct thread_data *td, struct fio_file *f,
875                 struct rpma_conn_cfg *cfg)
876 {
877         struct librpma_fio_server_data *csd = td->io_ops_data;
878         struct librpma_fio_options_values *o = td->eo;
879         enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
880         struct librpma_fio_workspace ws = {0};
881         struct rpma_conn_private_data pdata;
882         uint32_t max_msg_num;
883         struct rpma_conn_req *conn_req;
884         struct rpma_conn *conn;
885         struct rpma_mr_local *mr;
886         char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
887         struct rpma_ep *ep;
888         size_t mem_size = td->o.size;
889         size_t mr_desc_size;
890         void *ws_ptr;
891         bool is_dram;
892         int usage_mem_type;
893         int ret;
894
895         if (!f->file_name) {
896                 log_err("fio: filename is not set\n");
897                 return -1;
898         }
899
900         /* start a listening endpoint at addr:port */
901         if (librpma_fio_td_port(o->port, td, port_td))
902                 return -1;
903
904         if ((ret = rpma_ep_listen(csd->peer, o->server_ip, port_td, &ep))) {
905                 librpma_td_verror(td, ret, "rpma_ep_listen");
906                 return -1;
907         }
908
909         is_dram = !strcmp(f->file_name, "malloc");
910         if (is_dram) {
911                 /* allocation from DRAM using posix_memalign() */
912                 ws_ptr = librpma_fio_allocate_dram(td, mem_size, &csd->mem);
913                 usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_VISIBILITY;
914         } else {
915                 /* allocation from PMEM using pmem_map_file() */
916                 ws_ptr = librpma_fio_allocate_pmem(td, f, mem_size, &csd->mem);
917                 usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT;
918         }
919
920         if (ws_ptr == NULL)
921                 goto err_ep_shutdown;
922
923         f->real_file_size = mem_size;
924
925         if ((ret = rpma_mr_reg(csd->peer, ws_ptr, mem_size,
926                         RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
927                         RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
928                         usage_mem_type, &mr))) {
929                 librpma_td_verror(td, ret, "rpma_mr_reg");
930                 goto err_free;
931         }
932
933         if (!is_dram && f->filetype == FIO_TYPE_FILE) {
934                 ret = rpma_mr_advise(mr, 0, mem_size,
935                                 IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE,
936                                 IBV_ADVISE_MR_FLAG_FLUSH);
937                 if (ret) {
938                         librpma_td_verror(td, ret, "rpma_mr_advise");
939                         /* an invalid argument is an error */
940                         if (ret == RPMA_E_INVAL)
941                                 goto err_mr_dereg;
942
943                         /* log_err used instead of log_info to avoid corruption of the JSON output */
944                         log_err("Note: having rpma_mr_advise(3) failed because of RPMA_E_NOSUPP or RPMA_E_PROVIDER may come with a performance penalty, but it is not a blocker for running the benchmark.\n");
945                 }
946         }
947
948         /* get size of the memory region's descriptor */
949         if ((ret = rpma_mr_get_descriptor_size(mr, &mr_desc_size))) {
950                 librpma_td_verror(td, ret, "rpma_mr_get_descriptor_size");
951                 goto err_mr_dereg;
952         }
953
954         /* verify size of the memory region's descriptor */
955         if (mr_desc_size > LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE) {
956                 log_err(
957                         "size of the memory region's descriptor is too big (max=%i)\n",
958                         LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE);
959                 goto err_mr_dereg;
960         }
961
962         /* get the memory region's descriptor */
963         if ((ret = rpma_mr_get_descriptor(mr, &ws.descriptor[0]))) {
964                 librpma_td_verror(td, ret, "rpma_mr_get_descriptor");
965                 goto err_mr_dereg;
966         }
967
968         if (cfg != NULL) {
969                 if ((ret = rpma_conn_cfg_get_rq_size(cfg, &max_msg_num))) {
970                         librpma_td_verror(td, ret, "rpma_conn_cfg_get_rq_size");
971                         goto err_mr_dereg;
972                 }
973
974                 /* verify whether iodepth fits into uint16_t */
975                 if (max_msg_num > UINT16_MAX) {
976                         log_err("fio: iodepth too big (%u > %u)\n",
977                                 max_msg_num, UINT16_MAX);
978                         return -1;
979                 }
980
981                 ws.max_msg_num = max_msg_num;
982         }
983
984         /* prepare a workspace description */
985         ws.direct_write_to_pmem = o->direct_write_to_pmem;
986         ws.mr_desc_size = mr_desc_size;
987         pdata.ptr = &ws;
988         pdata.len = sizeof(ws);
989
990         /* receive an incoming connection request */
991         if ((ret = rpma_ep_next_conn_req(ep, cfg, &conn_req))) {
992                 librpma_td_verror(td, ret, "rpma_ep_next_conn_req");
993                 goto err_mr_dereg;
994         }
995
996         if (csd->prepare_connection && csd->prepare_connection(td, conn_req))
997                 goto err_req_delete;
998
999         /* accept the connection request and obtain the connection object */
1000         if ((ret = rpma_conn_req_connect(&conn_req, &pdata, &conn))) {
1001                 librpma_td_verror(td, ret, "rpma_conn_req_connect");
1002                 goto err_req_delete;
1003         }
1004
1005         /* wait for the connection to be established */
1006         if ((ret = rpma_conn_next_event(conn, &conn_event))) {
1007                 librpma_td_verror(td, ret, "rpma_conn_next_event");
1008                 goto err_conn_delete;
1009         } else if (conn_event != RPMA_CONN_ESTABLISHED) {
1010                 log_err("rpma_conn_next_event returned an unexptected event\n");
1011                 goto err_conn_delete;
1012         }
1013
1014         /* end-point is no longer needed */
1015         (void) rpma_ep_shutdown(&ep);
1016
1017         csd->ws_mr = mr;
1018         csd->ws_ptr = ws_ptr;
1019         csd->conn = conn;
1020
1021         /* get the connection's main CQ */
1022         if ((ret = rpma_conn_get_cq(csd->conn, &csd->cq))) {
1023                 librpma_td_verror(td, ret, "rpma_conn_get_cq");
1024                 goto err_conn_delete;
1025         }
1026
1027         return 0;
1028
1029 err_conn_delete:
1030         (void) rpma_conn_delete(&conn);
1031
1032 err_req_delete:
1033         (void) rpma_conn_req_delete(&conn_req);
1034
1035 err_mr_dereg:
1036         (void) rpma_mr_dereg(&mr);
1037
1038 err_free:
1039         librpma_fio_free(&csd->mem);
1040
1041 err_ep_shutdown:
1042         (void) rpma_ep_shutdown(&ep);
1043
1044         return -1;
1045 }
1046
1047 int librpma_fio_server_close_file(struct thread_data *td, struct fio_file *f)
1048 {
1049         struct librpma_fio_server_data *csd = td->io_ops_data;
1050         enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
1051         int rv = 0;
1052         int ret;
1053
1054         /* wait for the connection to be closed */
1055         ret = rpma_conn_next_event(csd->conn, &conn_event);
1056         if (!ret && conn_event != RPMA_CONN_CLOSED) {
1057                 log_err("rpma_conn_next_event returned an unexptected event\n");
1058                 rv = -1;
1059         }
1060
1061         if ((ret = rpma_conn_disconnect(csd->conn))) {
1062                 librpma_td_verror(td, ret, "rpma_conn_disconnect");
1063                 rv = -1;
1064         }
1065
1066         if ((ret = rpma_conn_delete(&csd->conn))) {
1067                 librpma_td_verror(td, ret, "rpma_conn_delete");
1068                 rv = -1;
1069         }
1070
1071         if ((ret = rpma_mr_dereg(&csd->ws_mr))) {
1072                 librpma_td_verror(td, ret, "rpma_mr_dereg");
1073                 rv = -1;
1074         }
1075
1076         librpma_fio_free(&csd->mem);
1077
1078         return rv;
1079 }