9d6ebf38ee865130ae9450dc79719dbbecc3f419
[fio.git] / engines / librpma_fio.c
1 /*
2  * librpma_fio: librpma_apm and librpma_gpspm engines' common part.
3  *
4  * Copyright 2021, Intel Corporation
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License,
8  * version 2 as published by the Free Software Foundation..
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  */
15
16 #include "librpma_fio.h"
17
18 #include <libpmem.h>
19
20 struct fio_option librpma_fio_options[] = {
21         {
22                 .name   = "serverip",
23                 .lname  = "rpma_server_ip",
24                 .type   = FIO_OPT_STR_STORE,
25                 .off1   = offsetof(struct librpma_fio_options_values, server_ip),
26                 .help   = "IP address the server is listening on",
27                 .def    = "",
28                 .category = FIO_OPT_C_ENGINE,
29                 .group  = FIO_OPT_G_LIBRPMA,
30         },
31         {
32                 .name   = "port",
33                 .lname  = "rpma_server port",
34                 .type   = FIO_OPT_STR_STORE,
35                 .off1   = offsetof(struct librpma_fio_options_values, port),
36                 .help   = "port the server is listening on",
37                 .def    = "7204",
38                 .category = FIO_OPT_C_ENGINE,
39                 .group  = FIO_OPT_G_LIBRPMA,
40         },
41         {
42                 .name   = "direct_write_to_pmem",
43                 .lname  = "Direct Write to PMem (via RDMA) from the remote host is possible",
44                 .type   = FIO_OPT_BOOL,
45                 .off1   = offsetof(struct librpma_fio_options_values,
46                                         direct_write_to_pmem),
47                 .help   = "Set to true ONLY when Direct Write to PMem from the remote host is possible (https://pmem.io/rpma/documentation/basic-direct-write-to-pmem.html)",
48                 .def    = "",
49                 .category = FIO_OPT_C_ENGINE,
50                 .group  = FIO_OPT_G_LIBRPMA,
51         },
52         {
53                 .name   = "busy_wait_polling",
54                 .lname  = "Set to 0 to wait for completion instead of busy-wait polling completion.",
55                 .type   = FIO_OPT_BOOL,
56                 .off1   = offsetof(struct librpma_fio_options_values,
57                                         busy_wait_polling),
58                 .help   = "Set to false if you want to reduce CPU usage",
59                 .def    = "1",
60                 .category = FIO_OPT_C_ENGINE,
61                 .group  = FIO_OPT_G_LIBRPMA,
62         },
63         {
64                 .name   = NULL,
65         },
66 };
67
68 int librpma_fio_td_port(const char *port_base_str, struct thread_data *td,
69                 char *port_out)
70 {
71         unsigned long int port_ul = strtoul(port_base_str, NULL, 10);
72         unsigned int port_new;
73
74         port_out[0] = '\0';
75
76         if (port_ul == ULONG_MAX) {
77                 td_verror(td, errno, "strtoul");
78                 return -1;
79         }
80         port_ul += td->thread_number - 1;
81         if (port_ul >= UINT_MAX) {
82                 log_err("[%u] port number (%lu) bigger than UINT_MAX\n",
83                         td->thread_number, port_ul);
84                 return -1;
85         }
86
87         port_new = port_ul;
88         snprintf(port_out, LIBRPMA_FIO_PORT_STR_LEN_MAX - 1, "%u", port_new);
89
90         return 0;
91 }
92
93 char *librpma_fio_allocate_dram(struct thread_data *td, size_t size,
94         struct librpma_fio_mem *mem)
95 {
96         char *mem_ptr = NULL;
97         int ret;
98
99         if ((ret = posix_memalign((void **)&mem_ptr, page_size, size))) {
100                 log_err("fio: posix_memalign() failed\n");
101                 td_verror(td, ret, "posix_memalign");
102                 return NULL;
103         }
104
105         mem->mem_ptr = mem_ptr;
106         mem->size_mmap = 0;
107
108         return mem_ptr;
109 }
110
111 char *librpma_fio_allocate_pmem(struct thread_data *td, struct fio_file *f,
112                 size_t size, struct librpma_fio_mem *mem)
113 {
114         size_t size_mmap = 0;
115         char *mem_ptr = NULL;
116         int is_pmem = 0;
117         size_t ws_offset;
118
119         if (size % page_size) {
120                 log_err("fio: size (%zu) is not aligned to page size (%zu)\n",
121                         size, page_size);
122                 return NULL;
123         }
124
125         if (f->filetype == FIO_TYPE_CHAR) {
126                 /* Each thread uses a separate offset within DeviceDAX. */
127                 ws_offset = (td->thread_number - 1) * size;
128         } else {
129                 /* Each thread uses a separate FileSystemDAX file. No offset is needed. */
130                 ws_offset = 0;
131         }
132
133         if (!f->file_name) {
134                 log_err("fio: filename is not set\n");
135                 return NULL;
136         }
137
138         /* map the file */
139         mem_ptr = pmem_map_file(f->file_name, 0 /* len */, 0 /* flags */,
140                         0 /* mode */, &size_mmap, &is_pmem);
141         if (mem_ptr == NULL) {
142                 log_err("fio: pmem_map_file(%s) failed\n", f->file_name);
143                 /* pmem_map_file() sets errno on failure */
144                 td_verror(td, errno, "pmem_map_file");
145                 return NULL;
146         }
147
148         /* pmem is expected */
149         if (!is_pmem) {
150                 log_err("fio: %s is not located in persistent memory\n",
151                         f->file_name);
152                 goto err_unmap;
153         }
154
155         /* check size of allocated persistent memory */
156         if (size_mmap < ws_offset + size) {
157                 log_err(
158                         "fio: %s is too small to handle so many threads (%zu < %zu)\n",
159                         f->file_name, size_mmap, ws_offset + size);
160                 goto err_unmap;
161         }
162
163         log_info("fio: size of memory mapped from the file %s: %zu\n",
164                 f->file_name, size_mmap);
165
166         mem->mem_ptr = mem_ptr;
167         mem->size_mmap = size_mmap;
168
169         return mem_ptr + ws_offset;
170
171 err_unmap:
172         (void) pmem_unmap(mem_ptr, size_mmap);
173         return NULL;
174 }
175
176 void librpma_fio_free(struct librpma_fio_mem *mem)
177 {
178         if (mem->size_mmap)
179                 (void) pmem_unmap(mem->mem_ptr, mem->size_mmap);
180         else
181                 free(mem->mem_ptr);
182 }
183
184 #define LIBRPMA_FIO_RETRY_MAX_NO        10
185 #define LIBRPMA_FIO_RETRY_DELAY_S       5
186
187 int librpma_fio_client_init(struct thread_data *td,
188                 struct rpma_conn_cfg *cfg)
189 {
190         struct librpma_fio_client_data *ccd;
191         struct librpma_fio_options_values *o = td->eo;
192         struct ibv_context *dev = NULL;
193         char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
194         struct rpma_conn_req *req = NULL;
195         enum rpma_conn_event event;
196         struct rpma_conn_private_data pdata;
197         enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
198         int remote_flush_type;
199         int retry;
200         int ret;
201
202         /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
203 #ifdef FIO_INC_DEBUG
204         if ((1UL << FD_NET) & fio_debug)
205                 log_level_aux = RPMA_LOG_LEVEL_INFO;
206 #endif
207
208         /* configure logging thresholds to see more details */
209         rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
210         rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
211
212         /* obtain an IBV context for a remote IP address */
213         if ((ret = rpma_utils_get_ibv_context(o->server_ip,
214                         RPMA_UTIL_IBV_CONTEXT_REMOTE, &dev))) {
215                 librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
216                 return -1;
217         }
218
219         /* allocate client's data */
220         ccd = calloc(1, sizeof(*ccd));
221         if (ccd == NULL) {
222                 td_verror(td, errno, "calloc");
223                 return -1;
224         }
225
226         /* allocate all in-memory queues */
227         ccd->io_us_queued = calloc(td->o.iodepth, sizeof(*ccd->io_us_queued));
228         if (ccd->io_us_queued == NULL) {
229                 td_verror(td, errno, "calloc");
230                 goto err_free_ccd;
231         }
232
233         ccd->io_us_flight = calloc(td->o.iodepth, sizeof(*ccd->io_us_flight));
234         if (ccd->io_us_flight == NULL) {
235                 td_verror(td, errno, "calloc");
236                 goto err_free_io_u_queues;
237         }
238
239         ccd->io_us_completed = calloc(td->o.iodepth,
240                         sizeof(*ccd->io_us_completed));
241         if (ccd->io_us_completed == NULL) {
242                 td_verror(td, errno, "calloc");
243                 goto err_free_io_u_queues;
244         }
245
246         /* create a new peer object */
247         if ((ret = rpma_peer_new(dev, &ccd->peer))) {
248                 librpma_td_verror(td, ret, "rpma_peer_new");
249                 goto err_free_io_u_queues;
250         }
251
252         /* create a connection request */
253         if (librpma_fio_td_port(o->port, td, port_td))
254                 goto err_peer_delete;
255
256         for (retry = 0; retry < LIBRPMA_FIO_RETRY_MAX_NO; retry++) {
257                 if ((ret = rpma_conn_req_new(ccd->peer, o->server_ip, port_td,
258                                 cfg, &req))) {
259                         librpma_td_verror(td, ret, "rpma_conn_req_new");
260                         goto err_peer_delete;
261                 }
262
263                 /*
264                  * Connect the connection request
265                  * and obtain the connection object.
266                  */
267                 if ((ret = rpma_conn_req_connect(&req, NULL, &ccd->conn))) {
268                         librpma_td_verror(td, ret, "rpma_conn_req_connect");
269                         goto err_req_delete;
270                 }
271
272                 /* wait for the connection to establish */
273                 if ((ret = rpma_conn_next_event(ccd->conn, &event))) {
274                         librpma_td_verror(td, ret, "rpma_conn_next_event");
275                         goto err_conn_delete;
276                 } else if (event == RPMA_CONN_ESTABLISHED) {
277                         break;
278                 } else if (event == RPMA_CONN_REJECTED) {
279                         (void) rpma_conn_disconnect(ccd->conn);
280                         (void) rpma_conn_delete(&ccd->conn);
281                         if (retry < LIBRPMA_FIO_RETRY_MAX_NO - 1) {
282                                 log_err("Thread [%d]: Retrying (#%i) ...\n",
283                                         td->thread_number, retry + 1);
284                                 sleep(LIBRPMA_FIO_RETRY_DELAY_S);
285                         } else {
286                                 log_err(
287                                         "Thread [%d]: The maximum number of retries exceeded. Closing.\n",
288                                         td->thread_number);
289                         }
290                 } else {
291                         log_err(
292                                 "rpma_conn_next_event returned an unexptected event: (%s != RPMA_CONN_ESTABLISHED)\n",
293                                 rpma_utils_conn_event_2str(event));
294                         goto err_conn_delete;
295                 }
296         }
297
298         if (retry > 0)
299                 log_err("Thread [%d]: Connected after retry #%i\n",
300                         td->thread_number, retry);
301
302         if (ccd->conn == NULL)
303                 goto err_peer_delete;
304
305         /* get the connection's private data sent from the server */
306         if ((ret = rpma_conn_get_private_data(ccd->conn, &pdata))) {
307                 librpma_td_verror(td, ret, "rpma_conn_get_private_data");
308                 goto err_conn_delete;
309         }
310
311         /* get the server's workspace representation */
312         ccd->ws = pdata.ptr;
313
314         /* create the server's memory representation */
315         if ((ret = rpma_mr_remote_from_descriptor(&ccd->ws->descriptor[0],
316                         ccd->ws->mr_desc_size, &ccd->server_mr))) {
317                 librpma_td_verror(td, ret, "rpma_mr_remote_from_descriptor");
318                 goto err_conn_delete;
319         }
320
321         /* get the total size of the shared server memory */
322         if ((ret = rpma_mr_remote_get_size(ccd->server_mr, &ccd->ws_size))) {
323                 librpma_td_verror(td, ret, "rpma_mr_remote_get_size");
324                 goto err_conn_delete;
325         }
326
327         /* get flush type of the remote node */
328         if ((ret = rpma_mr_remote_get_flush_type(ccd->server_mr,
329                         &remote_flush_type))) {
330                 librpma_td_verror(td, ret, "rpma_mr_remote_get_flush_type");
331                 goto err_conn_delete;
332         }
333
334         ccd->server_mr_flush_type =
335                 (remote_flush_type & RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT) ?
336                 RPMA_FLUSH_TYPE_PERSISTENT : RPMA_FLUSH_TYPE_VISIBILITY;
337
338         /*
339          * Assure an io_us buffer allocation is page-size-aligned which is required
340          * to register for RDMA. User-provided value is intentionally ignored.
341          */
342         td->o.mem_align = page_size;
343
344         td->io_ops_data = ccd;
345
346         return 0;
347
348 err_conn_delete:
349         (void) rpma_conn_disconnect(ccd->conn);
350         (void) rpma_conn_delete(&ccd->conn);
351
352 err_req_delete:
353         (void) rpma_conn_req_delete(&req);
354
355 err_peer_delete:
356         (void) rpma_peer_delete(&ccd->peer);
357
358 err_free_io_u_queues:
359         free(ccd->io_us_queued);
360         free(ccd->io_us_flight);
361         free(ccd->io_us_completed);
362
363 err_free_ccd:
364         free(ccd);
365
366         return -1;
367 }
368
369 void librpma_fio_client_cleanup(struct thread_data *td)
370 {
371         struct librpma_fio_client_data *ccd = td->io_ops_data;
372         enum rpma_conn_event ev;
373         int ret;
374
375         if (ccd == NULL)
376                 return;
377
378         /* delete the iou's memory registration */
379         if ((ret = rpma_mr_dereg(&ccd->orig_mr)))
380                 librpma_td_verror(td, ret, "rpma_mr_dereg");
381         /* delete the iou's memory registration */
382         if ((ret = rpma_mr_remote_delete(&ccd->server_mr)))
383                 librpma_td_verror(td, ret, "rpma_mr_remote_delete");
384         /* initiate disconnection */
385         if ((ret = rpma_conn_disconnect(ccd->conn)))
386                 librpma_td_verror(td, ret, "rpma_conn_disconnect");
387         /* wait for disconnection to end up */
388         if ((ret = rpma_conn_next_event(ccd->conn, &ev))) {
389                 librpma_td_verror(td, ret, "rpma_conn_next_event");
390         } else if (ev != RPMA_CONN_CLOSED) {
391                 log_err(
392                         "client_cleanup received an unexpected event (%s != RPMA_CONN_CLOSED)\n",
393                         rpma_utils_conn_event_2str(ev));
394         }
395         /* delete the connection */
396         if ((ret = rpma_conn_delete(&ccd->conn)))
397                 librpma_td_verror(td, ret, "rpma_conn_delete");
398         /* delete the peer */
399         if ((ret = rpma_peer_delete(&ccd->peer)))
400                 librpma_td_verror(td, ret, "rpma_peer_delete");
401         /* free the software queues */
402         free(ccd->io_us_queued);
403         free(ccd->io_us_flight);
404         free(ccd->io_us_completed);
405         free(ccd);
406         td->io_ops_data = NULL; /* zero ccd */
407 }
408
409 int librpma_fio_file_nop(struct thread_data *td, struct fio_file *f)
410 {
411         /* NOP */
412         return 0;
413 }
414
415 int librpma_fio_client_post_init(struct thread_data *td)
416 {
417         struct librpma_fio_client_data *ccd =  td->io_ops_data;
418         size_t io_us_size;
419         int ret;
420
421         /*
422          * td->orig_buffer is not aligned. The engine requires aligned io_us
423          * so FIO alignes up the address using the formula below.
424          */
425         ccd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
426                         td->o.mem_align;
427
428         /*
429          * td->orig_buffer_size beside the space really consumed by io_us
430          * has paddings which can be omitted for the memory registration.
431          */
432         io_us_size = (unsigned long long)td_max_bs(td) *
433                         (unsigned long long)td->o.iodepth;
434
435         if ((ret = rpma_mr_reg(ccd->peer, ccd->orig_buffer_aligned, io_us_size,
436                         RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
437                         RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
438                         RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT, &ccd->orig_mr)))
439                 librpma_td_verror(td, ret, "rpma_mr_reg");
440         return ret;
441 }
442
443 int librpma_fio_client_get_file_size(struct thread_data *td,
444                 struct fio_file *f)
445 {
446         struct librpma_fio_client_data *ccd = td->io_ops_data;
447
448         f->real_file_size = ccd->ws_size;
449         fio_file_set_size_known(f);
450
451         return 0;
452 }
453
454 static enum fio_q_status client_queue_sync(struct thread_data *td,
455                 struct io_u *io_u)
456 {
457         struct librpma_fio_client_data *ccd = td->io_ops_data;
458         struct rpma_completion cmpl;
459         unsigned io_u_index;
460         int ret;
461
462         /* execute io_u */
463         if (io_u->ddir == DDIR_READ) {
464                 /* post an RDMA read operation */
465                 if (librpma_fio_client_io_read(td, io_u,
466                                 RPMA_F_COMPLETION_ALWAYS))
467                         goto err;
468         } else if (io_u->ddir == DDIR_WRITE) {
469                 /* post an RDMA write operation */
470                 if (librpma_fio_client_io_write(td, io_u))
471                         goto err;
472                 if (ccd->flush(td, io_u, io_u, io_u->xfer_buflen))
473                         goto err;
474         } else {
475                 log_err("unsupported IO mode: %s\n", io_ddir_name(io_u->ddir));
476                 goto err;
477         }
478
479         do {
480                 /* get a completion */
481                 ret = rpma_conn_completion_get(ccd->conn, &cmpl);
482                 if (ret == RPMA_E_NO_COMPLETION) {
483                         /* lack of completion is not an error */
484                         continue;
485                 } else if (ret != 0) {
486                         /* an error occurred */
487                         librpma_td_verror(td, ret, "rpma_conn_completion_get");
488                         goto err;
489                 }
490
491                 /* if io_us has completed with an error */
492                 if (cmpl.op_status != IBV_WC_SUCCESS)
493                         goto err;
494
495                 if (cmpl.op == RPMA_OP_SEND)
496                         ++ccd->op_send_completed;
497                 else {
498                         if (cmpl.op == RPMA_OP_RECV)
499                                 ++ccd->op_recv_completed;
500
501                         break;
502                 }
503         } while (1);
504
505         if (ccd->get_io_u_index(&cmpl, &io_u_index) != 1)
506                 goto err;
507
508         if (io_u->index != io_u_index) {
509                 log_err(
510                         "no matching io_u for received completion found (io_u_index=%u)\n",
511                         io_u_index);
512                 goto err;
513         }
514
515         /* make sure all SENDs are completed before exit - clean up SQ */
516         if (librpma_fio_client_io_complete_all_sends(td))
517                 goto err;
518
519         return FIO_Q_COMPLETED;
520
521 err:
522         io_u->error = -1;
523         return FIO_Q_COMPLETED;
524 }
525
526 enum fio_q_status librpma_fio_client_queue(struct thread_data *td,
527                 struct io_u *io_u)
528 {
529         struct librpma_fio_client_data *ccd = td->io_ops_data;
530
531         if (ccd->io_u_queued_nr == (int)td->o.iodepth)
532                 return FIO_Q_BUSY;
533
534         if (td->o.sync_io)
535                 return client_queue_sync(td, io_u);
536
537         /* io_u -> queued[] */
538         ccd->io_us_queued[ccd->io_u_queued_nr] = io_u;
539         ccd->io_u_queued_nr++;
540
541         return FIO_Q_QUEUED;
542 }
543
544 int librpma_fio_client_commit(struct thread_data *td)
545 {
546         struct librpma_fio_client_data *ccd = td->io_ops_data;
547         int flags = RPMA_F_COMPLETION_ON_ERROR;
548         struct timespec now;
549         bool fill_time;
550         int i;
551         struct io_u *flush_first_io_u = NULL;
552         unsigned long long int flush_len = 0;
553
554         if (!ccd->io_us_queued)
555                 return -1;
556
557         /* execute all io_us from queued[] */
558         for (i = 0; i < ccd->io_u_queued_nr; i++) {
559                 struct io_u *io_u = ccd->io_us_queued[i];
560
561                 if (io_u->ddir == DDIR_READ) {
562                         if (i + 1 == ccd->io_u_queued_nr ||
563                             ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE)
564                                 flags = RPMA_F_COMPLETION_ALWAYS;
565                         /* post an RDMA read operation */
566                         if (librpma_fio_client_io_read(td, io_u, flags))
567                                 return -1;
568                 } else if (io_u->ddir == DDIR_WRITE) {
569                         /* post an RDMA write operation */
570                         if (librpma_fio_client_io_write(td, io_u))
571                                 return -1;
572
573                         /* cache the first io_u in the sequence */
574                         if (flush_first_io_u == NULL)
575                                 flush_first_io_u = io_u;
576
577                         /*
578                          * the flush length is the sum of all io_u's creating
579                          * the sequence
580                          */
581                         flush_len += io_u->xfer_buflen;
582
583                         /*
584                          * if io_u's are random the rpma_flush is required
585                          * after each one of them
586                          */
587                         if (!td_random(td)) {
588                                 /*
589                                  * When the io_u's are sequential and
590                                  * the current io_u is not the last one and
591                                  * the next one is also a write operation
592                                  * the flush can be postponed by one io_u and
593                                  * cover all of them which build a continuous
594                                  * sequence.
595                                  */
596                                 if ((i + 1 < ccd->io_u_queued_nr) &&
597                                     (ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE))
598                                         continue;
599                         }
600
601                         /* flush all writes which build a continuous sequence */
602                         if (ccd->flush(td, flush_first_io_u, io_u, flush_len))
603                                 return -1;
604
605                         /*
606                          * reset the flush parameters in preparation for
607                          * the next one
608                          */
609                         flush_first_io_u = NULL;
610                         flush_len = 0;
611                 } else {
612                         log_err("unsupported IO mode: %s\n",
613                                 io_ddir_name(io_u->ddir));
614                         return -1;
615                 }
616         }
617
618         if ((fill_time = fio_fill_issue_time(td)))
619                 fio_gettime(&now, NULL);
620
621         /* move executed io_us from queued[] to flight[] */
622         for (i = 0; i < ccd->io_u_queued_nr; i++) {
623                 struct io_u *io_u = ccd->io_us_queued[i];
624
625                 /* FIO does not do this if the engine is asynchronous */
626                 if (fill_time)
627                         memcpy(&io_u->issue_time, &now, sizeof(now));
628
629                 /* move executed io_us from queued[] to flight[] */
630                 ccd->io_us_flight[ccd->io_u_flight_nr] = io_u;
631                 ccd->io_u_flight_nr++;
632
633                 /*
634                  * FIO says:
635                  * If an engine has the commit hook
636                  * it has to call io_u_queued() itself.
637                  */
638                 io_u_queued(td, io_u);
639         }
640
641         /* FIO does not do this if an engine has the commit hook. */
642         io_u_mark_submit(td, ccd->io_u_queued_nr);
643         ccd->io_u_queued_nr = 0;
644
645         return 0;
646 }
647
648 /*
649  * RETURN VALUE
650  * - > 0  - a number of completed io_us
651  * -   0  - when no complicitions received
652  * - (-1) - when an error occurred
653  */
654 static int client_getevent_process(struct thread_data *td)
655 {
656         struct librpma_fio_client_data *ccd = td->io_ops_data;
657         struct rpma_completion cmpl;
658         /* io_u->index of completed io_u (cmpl.op_context) */
659         unsigned int io_u_index;
660         /* # of completed io_us */
661         int cmpl_num = 0;
662         /* helpers */
663         struct io_u *io_u;
664         int i;
665         int ret;
666
667         /* get a completion */
668         if ((ret = rpma_conn_completion_get(ccd->conn, &cmpl))) {
669                 /* lack of completion is not an error */
670                 if (ret == RPMA_E_NO_COMPLETION) {
671                         /* lack of completion is not an error */
672                         return 0;
673                 }
674
675                 /* an error occurred */
676                 librpma_td_verror(td, ret, "rpma_conn_completion_get");
677                 return -1;
678         }
679
680         /* if io_us has completed with an error */
681         if (cmpl.op_status != IBV_WC_SUCCESS) {
682                 td->error = cmpl.op_status;
683                 return -1;
684         }
685
686         if (cmpl.op == RPMA_OP_SEND)
687                 ++ccd->op_send_completed;
688         else if (cmpl.op == RPMA_OP_RECV)
689                 ++ccd->op_recv_completed;
690
691         if ((ret = ccd->get_io_u_index(&cmpl, &io_u_index)) != 1)
692                 return ret;
693
694         /* look for an io_u being completed */
695         for (i = 0; i < ccd->io_u_flight_nr; ++i) {
696                 if (ccd->io_us_flight[i]->index == io_u_index) {
697                         cmpl_num = i + 1;
698                         break;
699                 }
700         }
701
702         /* if no matching io_u has been found */
703         if (cmpl_num == 0) {
704                 log_err(
705                         "no matching io_u for received completion found (io_u_index=%u)\n",
706                         io_u_index);
707                 return -1;
708         }
709
710         /* move completed io_us to the completed in-memory queue */
711         for (i = 0; i < cmpl_num; ++i) {
712                 /* get and prepare io_u */
713                 io_u = ccd->io_us_flight[i];
714
715                 /* append to the queue */
716                 ccd->io_us_completed[ccd->io_u_completed_nr] = io_u;
717                 ccd->io_u_completed_nr++;
718         }
719
720         /* remove completed io_us from the flight queue */
721         for (i = cmpl_num; i < ccd->io_u_flight_nr; ++i)
722                 ccd->io_us_flight[i - cmpl_num] = ccd->io_us_flight[i];
723         ccd->io_u_flight_nr -= cmpl_num;
724
725         return cmpl_num;
726 }
727
728 int librpma_fio_client_getevents(struct thread_data *td, unsigned int min,
729                 unsigned int max, const struct timespec *t)
730 {
731         struct librpma_fio_client_data *ccd = td->io_ops_data;
732         /* total # of completed io_us */
733         int cmpl_num_total = 0;
734         /* # of completed io_us from a single event */
735         int cmpl_num;
736
737         do {
738                 cmpl_num = client_getevent_process(td);
739                 if (cmpl_num > 0) {
740                         /* new completions collected */
741                         cmpl_num_total += cmpl_num;
742                 } else if (cmpl_num == 0) {
743                         /*
744                          * It is required to make sure that CQEs for SENDs
745                          * will flow at least at the same pace as CQEs for RECVs.
746                          */
747                         if (cmpl_num_total >= min &&
748                             ccd->op_send_completed >= ccd->op_recv_completed)
749                                 break;
750
751                         /*
752                          * To reduce CPU consumption one can use
753                          * the rpma_conn_completion_wait() function.
754                          * Note this greatly increase the latency
755                          * and make the results less stable.
756                          * The bandwidth stays more or less the same.
757                          */
758                 } else {
759                         /* an error occurred */
760                         return -1;
761                 }
762
763                 /*
764                  * The expected max can be exceeded if CQEs for RECVs will come up
765                  * faster than CQEs for SENDs. But it is required to make sure CQEs for
766                  * SENDs will flow at least at the same pace as CQEs for RECVs.
767                  */
768         } while (cmpl_num_total < max ||
769                         ccd->op_send_completed < ccd->op_recv_completed);
770
771         /*
772          * All posted SENDs are completed and RECVs for them (responses) are
773          * completed. This is the initial situation so the counters are reset.
774          */
775         if (ccd->op_send_posted == ccd->op_send_completed &&
776                         ccd->op_send_completed == ccd->op_recv_completed) {
777                 ccd->op_send_posted = 0;
778                 ccd->op_send_completed = 0;
779                 ccd->op_recv_completed = 0;
780         }
781
782         return cmpl_num_total;
783 }
784
785 struct io_u *librpma_fio_client_event(struct thread_data *td, int event)
786 {
787         struct librpma_fio_client_data *ccd = td->io_ops_data;
788         struct io_u *io_u;
789         int i;
790
791         /* get the first io_u from the queue */
792         io_u = ccd->io_us_completed[0];
793
794         /* remove the first io_u from the queue */
795         for (i = 1; i < ccd->io_u_completed_nr; ++i)
796                 ccd->io_us_completed[i - 1] = ccd->io_us_completed[i];
797         ccd->io_u_completed_nr--;
798
799         dprint_io_u(io_u, "client_event");
800
801         return io_u;
802 }
803
804 char *librpma_fio_client_errdetails(struct io_u *io_u)
805 {
806         /* get the string representation of an error */
807         enum ibv_wc_status status = io_u->error;
808         const char *status_str = ibv_wc_status_str(status);
809
810         char *details = strdup(status_str);
811         if (details == NULL) {
812                 fprintf(stderr, "Error: %s\n", status_str);
813                 fprintf(stderr, "Fatal error: out of memory. Aborting.\n");
814                 abort();
815         }
816
817         /* FIO frees the returned string when it becomes obsolete */
818         return details;
819 }
820
821 int librpma_fio_server_init(struct thread_data *td)
822 {
823         struct librpma_fio_options_values *o = td->eo;
824         struct librpma_fio_server_data *csd;
825         struct ibv_context *dev = NULL;
826         enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
827         int ret = -1;
828
829         /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
830 #ifdef FIO_INC_DEBUG
831         if ((1UL << FD_NET) & fio_debug)
832                 log_level_aux = RPMA_LOG_LEVEL_INFO;
833 #endif
834
835         /* configure logging thresholds to see more details */
836         rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
837         rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
838
839
840         /* obtain an IBV context for a remote IP address */
841         if ((ret = rpma_utils_get_ibv_context(o->server_ip,
842                         RPMA_UTIL_IBV_CONTEXT_LOCAL, &dev))) {
843                 librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
844                 return -1;
845         }
846
847         /* allocate server's data */
848         csd = calloc(1, sizeof(*csd));
849         if (csd == NULL) {
850                 td_verror(td, errno, "calloc");
851                 return -1;
852         }
853
854         /* create a new peer object */
855         if ((ret = rpma_peer_new(dev, &csd->peer))) {
856                 librpma_td_verror(td, ret, "rpma_peer_new");
857                 goto err_free_csd;
858         }
859
860         td->io_ops_data = csd;
861
862         return 0;
863
864 err_free_csd:
865         free(csd);
866
867         return -1;
868 }
869
870 void librpma_fio_server_cleanup(struct thread_data *td)
871 {
872         struct librpma_fio_server_data *csd =  td->io_ops_data;
873         int ret;
874
875         if (csd == NULL)
876                 return;
877
878         /* free the peer */
879         if ((ret = rpma_peer_delete(&csd->peer)))
880                 librpma_td_verror(td, ret, "rpma_peer_delete");
881
882         free(csd);
883 }
884
885 int librpma_fio_server_open_file(struct thread_data *td, struct fio_file *f,
886                 struct rpma_conn_cfg *cfg)
887 {
888         struct librpma_fio_server_data *csd = td->io_ops_data;
889         struct librpma_fio_options_values *o = td->eo;
890         enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
891         struct librpma_fio_workspace ws = {0};
892         struct rpma_conn_private_data pdata;
893         uint32_t max_msg_num;
894         struct rpma_conn_req *conn_req;
895         struct rpma_conn *conn;
896         struct rpma_mr_local *mr;
897         char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
898         struct rpma_ep *ep;
899         size_t mem_size = td->o.size;
900         size_t mr_desc_size;
901         void *ws_ptr;
902         bool is_dram;
903         int usage_mem_type;
904         int ret;
905
906         if (!f->file_name) {
907                 log_err("fio: filename is not set\n");
908                 return -1;
909         }
910
911         /* start a listening endpoint at addr:port */
912         if (librpma_fio_td_port(o->port, td, port_td))
913                 return -1;
914
915         if ((ret = rpma_ep_listen(csd->peer, o->server_ip, port_td, &ep))) {
916                 librpma_td_verror(td, ret, "rpma_ep_listen");
917                 return -1;
918         }
919
920         is_dram = !strcmp(f->file_name, "malloc");
921         if (is_dram) {
922                 /* allocation from DRAM using posix_memalign() */
923                 ws_ptr = librpma_fio_allocate_dram(td, mem_size, &csd->mem);
924                 usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_VISIBILITY;
925         } else {
926                 /* allocation from PMEM using pmem_map_file() */
927                 ws_ptr = librpma_fio_allocate_pmem(td, f, mem_size, &csd->mem);
928                 usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT;
929         }
930
931         if (ws_ptr == NULL)
932                 goto err_ep_shutdown;
933
934         f->real_file_size = mem_size;
935
936         if ((ret = rpma_mr_reg(csd->peer, ws_ptr, mem_size,
937                         RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
938                         RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
939                         usage_mem_type, &mr))) {
940                 librpma_td_verror(td, ret, "rpma_mr_reg");
941                 goto err_free;
942         }
943
944         if (!is_dram && f->filetype == FIO_TYPE_FILE) {
945                 ret = rpma_mr_advise(mr, 0, mem_size,
946                                 IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE,
947                                 IBV_ADVISE_MR_FLAG_FLUSH);
948                 if (ret) {
949                         librpma_td_verror(td, ret, "rpma_mr_advise");
950                         /* an invalid argument is an error */
951                         if (ret == RPMA_E_INVAL)
952                                 goto err_mr_dereg;
953
954                         /* log_err used instead of log_info to avoid corruption of the JSON output */
955                         log_err("Note: having rpma_mr_advise(3) failed because of RPMA_E_NOSUPP or RPMA_E_PROVIDER may come with a performance penalty, but it is not a blocker for running the benchmark.\n");
956                 }
957         }
958
959         /* get size of the memory region's descriptor */
960         if ((ret = rpma_mr_get_descriptor_size(mr, &mr_desc_size))) {
961                 librpma_td_verror(td, ret, "rpma_mr_get_descriptor_size");
962                 goto err_mr_dereg;
963         }
964
965         /* verify size of the memory region's descriptor */
966         if (mr_desc_size > LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE) {
967                 log_err(
968                         "size of the memory region's descriptor is too big (max=%i)\n",
969                         LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE);
970                 goto err_mr_dereg;
971         }
972
973         /* get the memory region's descriptor */
974         if ((ret = rpma_mr_get_descriptor(mr, &ws.descriptor[0]))) {
975                 librpma_td_verror(td, ret, "rpma_mr_get_descriptor");
976                 goto err_mr_dereg;
977         }
978
979         if (cfg != NULL) {
980                 if ((ret = rpma_conn_cfg_get_rq_size(cfg, &max_msg_num))) {
981                         librpma_td_verror(td, ret, "rpma_conn_cfg_get_rq_size");
982                         goto err_mr_dereg;
983                 }
984
985                 /* verify whether iodepth fits into uint16_t */
986                 if (max_msg_num > UINT16_MAX) {
987                         log_err("fio: iodepth too big (%u > %u)\n",
988                                 max_msg_num, UINT16_MAX);
989                         return -1;
990                 }
991
992                 ws.max_msg_num = max_msg_num;
993         }
994
995         /* prepare a workspace description */
996         ws.direct_write_to_pmem = o->direct_write_to_pmem;
997         ws.mr_desc_size = mr_desc_size;
998         pdata.ptr = &ws;
999         pdata.len = sizeof(ws);
1000
1001         /* receive an incoming connection request */
1002         if ((ret = rpma_ep_next_conn_req(ep, cfg, &conn_req))) {
1003                 librpma_td_verror(td, ret, "rpma_ep_next_conn_req");
1004                 goto err_mr_dereg;
1005         }
1006
1007         if (csd->prepare_connection && csd->prepare_connection(td, conn_req))
1008                 goto err_req_delete;
1009
1010         /* accept the connection request and obtain the connection object */
1011         if ((ret = rpma_conn_req_connect(&conn_req, &pdata, &conn))) {
1012                 librpma_td_verror(td, ret, "rpma_conn_req_connect");
1013                 goto err_req_delete;
1014         }
1015
1016         /* wait for the connection to be established */
1017         if ((ret = rpma_conn_next_event(conn, &conn_event))) {
1018                 librpma_td_verror(td, ret, "rpma_conn_next_event");
1019                 goto err_conn_delete;
1020         } else if (conn_event != RPMA_CONN_ESTABLISHED) {
1021                 log_err("rpma_conn_next_event returned an unexptected event\n");
1022                 goto err_conn_delete;
1023         }
1024
1025         /* end-point is no longer needed */
1026         (void) rpma_ep_shutdown(&ep);
1027
1028         csd->ws_mr = mr;
1029         csd->ws_ptr = ws_ptr;
1030         csd->conn = conn;
1031
1032         return 0;
1033
1034 err_conn_delete:
1035         (void) rpma_conn_delete(&conn);
1036
1037 err_req_delete:
1038         (void) rpma_conn_req_delete(&conn_req);
1039
1040 err_mr_dereg:
1041         (void) rpma_mr_dereg(&mr);
1042
1043 err_free:
1044         librpma_fio_free(&csd->mem);
1045
1046 err_ep_shutdown:
1047         (void) rpma_ep_shutdown(&ep);
1048
1049         return -1;
1050 }
1051
1052 int librpma_fio_server_close_file(struct thread_data *td, struct fio_file *f)
1053 {
1054         struct librpma_fio_server_data *csd = td->io_ops_data;
1055         enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
1056         int rv = 0;
1057         int ret;
1058
1059         /* wait for the connection to be closed */
1060         ret = rpma_conn_next_event(csd->conn, &conn_event);
1061         if (!ret && conn_event != RPMA_CONN_CLOSED) {
1062                 log_err("rpma_conn_next_event returned an unexptected event\n");
1063                 rv = -1;
1064         }
1065
1066         if ((ret = rpma_conn_disconnect(csd->conn))) {
1067                 librpma_td_verror(td, ret, "rpma_conn_disconnect");
1068                 rv = -1;
1069         }
1070
1071         if ((ret = rpma_conn_delete(&csd->conn))) {
1072                 librpma_td_verror(td, ret, "rpma_conn_delete");
1073                 rv = -1;
1074         }
1075
1076         if ((ret = rpma_mr_dereg(&csd->ws_mr))) {
1077                 librpma_td_verror(td, ret, "rpma_mr_dereg");
1078                 rv = -1;
1079         }
1080
1081         librpma_fio_free(&csd->mem);
1082
1083         return rv;
1084 }