rpma: update RPMA engines with new librpma completions API
[fio.git] / engines / librpma_fio.c
1 /*
2  * librpma_fio: librpma_apm and librpma_gpspm engines' common part.
3  *
4  * Copyright 2021, Intel Corporation
5  *
6  * This program is free software; you can redistribute it and/or
7  * modify it under the terms of the GNU General Public License,
8  * version 2 as published by the Free Software Foundation..
9  *
10  * This program is distributed in the hope that it will be useful,
11  * but WITHOUT ANY WARRANTY; without even the implied warranty of
12  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
13  * GNU General Public License for more details.
14  */
15
16 #include "librpma_fio.h"
17
18 #include <libpmem.h>
19
20 struct fio_option librpma_fio_options[] = {
21         {
22                 .name   = "serverip",
23                 .lname  = "rpma_server_ip",
24                 .type   = FIO_OPT_STR_STORE,
25                 .off1   = offsetof(struct librpma_fio_options_values, server_ip),
26                 .help   = "IP address the server is listening on",
27                 .def    = "",
28                 .category = FIO_OPT_C_ENGINE,
29                 .group  = FIO_OPT_G_LIBRPMA,
30         },
31         {
32                 .name   = "port",
33                 .lname  = "rpma_server port",
34                 .type   = FIO_OPT_STR_STORE,
35                 .off1   = offsetof(struct librpma_fio_options_values, port),
36                 .help   = "port the server is listening on",
37                 .def    = "7204",
38                 .category = FIO_OPT_C_ENGINE,
39                 .group  = FIO_OPT_G_LIBRPMA,
40         },
41         {
42                 .name   = "direct_write_to_pmem",
43                 .lname  = "Direct Write to PMem (via RDMA) from the remote host is possible",
44                 .type   = FIO_OPT_BOOL,
45                 .off1   = offsetof(struct librpma_fio_options_values,
46                                         direct_write_to_pmem),
47                 .help   = "Set to true ONLY when Direct Write to PMem from the remote host is possible (https://pmem.io/rpma/documentation/basic-direct-write-to-pmem.html)",
48                 .def    = "",
49                 .category = FIO_OPT_C_ENGINE,
50                 .group  = FIO_OPT_G_LIBRPMA,
51         },
52         {
53                 .name   = "busy_wait_polling",
54                 .lname  = "Set to 0 to wait for completion instead of busy-wait polling completion.",
55                 .type   = FIO_OPT_BOOL,
56                 .off1   = offsetof(struct librpma_fio_options_values,
57                                         busy_wait_polling),
58                 .help   = "Set to false if you want to reduce CPU usage",
59                 .def    = "1",
60                 .category = FIO_OPT_C_ENGINE,
61                 .group  = FIO_OPT_G_LIBRPMA,
62         },
63         {
64                 .name   = NULL,
65         },
66 };
67
68 int librpma_fio_td_port(const char *port_base_str, struct thread_data *td,
69                 char *port_out)
70 {
71         unsigned long int port_ul = strtoul(port_base_str, NULL, 10);
72         unsigned int port_new;
73
74         port_out[0] = '\0';
75
76         if (port_ul == ULONG_MAX) {
77                 td_verror(td, errno, "strtoul");
78                 return -1;
79         }
80         port_ul += td->thread_number - 1;
81         if (port_ul >= UINT_MAX) {
82                 log_err("[%u] port number (%lu) bigger than UINT_MAX\n",
83                         td->thread_number, port_ul);
84                 return -1;
85         }
86
87         port_new = port_ul;
88         snprintf(port_out, LIBRPMA_FIO_PORT_STR_LEN_MAX - 1, "%u", port_new);
89
90         return 0;
91 }
92
93 char *librpma_fio_allocate_dram(struct thread_data *td, size_t size,
94         struct librpma_fio_mem *mem)
95 {
96         char *mem_ptr = NULL;
97         int ret;
98
99         if ((ret = posix_memalign((void **)&mem_ptr, page_size, size))) {
100                 log_err("fio: posix_memalign() failed\n");
101                 td_verror(td, ret, "posix_memalign");
102                 return NULL;
103         }
104
105         mem->mem_ptr = mem_ptr;
106         mem->size_mmap = 0;
107
108         return mem_ptr;
109 }
110
111 char *librpma_fio_allocate_pmem(struct thread_data *td, struct fio_file *f,
112                 size_t size, struct librpma_fio_mem *mem)
113 {
114         size_t size_mmap = 0;
115         char *mem_ptr = NULL;
116         int is_pmem = 0;
117         size_t ws_offset;
118
119         if (size % page_size) {
120                 log_err("fio: size (%zu) is not aligned to page size (%zu)\n",
121                         size, page_size);
122                 return NULL;
123         }
124
125         if (f->filetype == FIO_TYPE_CHAR) {
126                 /* Each thread uses a separate offset within DeviceDAX. */
127                 ws_offset = (td->thread_number - 1) * size;
128         } else {
129                 /* Each thread uses a separate FileSystemDAX file. No offset is needed. */
130                 ws_offset = 0;
131         }
132
133         if (!f->file_name) {
134                 log_err("fio: filename is not set\n");
135                 return NULL;
136         }
137
138         /* map the file */
139         mem_ptr = pmem_map_file(f->file_name, 0 /* len */, 0 /* flags */,
140                         0 /* mode */, &size_mmap, &is_pmem);
141         if (mem_ptr == NULL) {
142                 log_err("fio: pmem_map_file(%s) failed\n", f->file_name);
143                 /* pmem_map_file() sets errno on failure */
144                 td_verror(td, errno, "pmem_map_file");
145                 return NULL;
146         }
147
148         /* pmem is expected */
149         if (!is_pmem) {
150                 log_err("fio: %s is not located in persistent memory\n",
151                         f->file_name);
152                 goto err_unmap;
153         }
154
155         /* check size of allocated persistent memory */
156         if (size_mmap < ws_offset + size) {
157                 log_err(
158                         "fio: %s is too small to handle so many threads (%zu < %zu)\n",
159                         f->file_name, size_mmap, ws_offset + size);
160                 goto err_unmap;
161         }
162
163         log_info("fio: size of memory mapped from the file %s: %zu\n",
164                 f->file_name, size_mmap);
165
166         mem->mem_ptr = mem_ptr;
167         mem->size_mmap = size_mmap;
168
169         return mem_ptr + ws_offset;
170
171 err_unmap:
172         (void) pmem_unmap(mem_ptr, size_mmap);
173         return NULL;
174 }
175
176 void librpma_fio_free(struct librpma_fio_mem *mem)
177 {
178         if (mem->size_mmap)
179                 (void) pmem_unmap(mem->mem_ptr, mem->size_mmap);
180         else
181                 free(mem->mem_ptr);
182 }
183
184 #define LIBRPMA_FIO_RETRY_MAX_NO        10
185 #define LIBRPMA_FIO_RETRY_DELAY_S       5
186
187 int librpma_fio_client_init(struct thread_data *td,
188                 struct rpma_conn_cfg *cfg)
189 {
190         struct librpma_fio_client_data *ccd;
191         struct librpma_fio_options_values *o = td->eo;
192         struct ibv_context *dev = NULL;
193         char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
194         struct rpma_conn_req *req = NULL;
195         enum rpma_conn_event event;
196         struct rpma_conn_private_data pdata;
197         enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
198         int remote_flush_type;
199         int retry;
200         int ret;
201
202         /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
203 #ifdef FIO_INC_DEBUG
204         if ((1UL << FD_NET) & fio_debug)
205                 log_level_aux = RPMA_LOG_LEVEL_INFO;
206 #endif
207
208         /* configure logging thresholds to see more details */
209         rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
210         rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
211
212         /* obtain an IBV context for a remote IP address */
213         if ((ret = rpma_utils_get_ibv_context(o->server_ip,
214                         RPMA_UTIL_IBV_CONTEXT_REMOTE, &dev))) {
215                 librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
216                 return -1;
217         }
218
219         /* allocate client's data */
220         ccd = calloc(1, sizeof(*ccd));
221         if (ccd == NULL) {
222                 td_verror(td, errno, "calloc");
223                 return -1;
224         }
225
226         /* allocate all in-memory queues */
227         ccd->io_us_queued = calloc(td->o.iodepth, sizeof(*ccd->io_us_queued));
228         if (ccd->io_us_queued == NULL) {
229                 td_verror(td, errno, "calloc");
230                 goto err_free_ccd;
231         }
232
233         ccd->io_us_flight = calloc(td->o.iodepth, sizeof(*ccd->io_us_flight));
234         if (ccd->io_us_flight == NULL) {
235                 td_verror(td, errno, "calloc");
236                 goto err_free_io_u_queues;
237         }
238
239         ccd->io_us_completed = calloc(td->o.iodepth,
240                         sizeof(*ccd->io_us_completed));
241         if (ccd->io_us_completed == NULL) {
242                 td_verror(td, errno, "calloc");
243                 goto err_free_io_u_queues;
244         }
245
246         /* create a new peer object */
247         if ((ret = rpma_peer_new(dev, &ccd->peer))) {
248                 librpma_td_verror(td, ret, "rpma_peer_new");
249                 goto err_free_io_u_queues;
250         }
251
252         /* create a connection request */
253         if (librpma_fio_td_port(o->port, td, port_td))
254                 goto err_peer_delete;
255
256         for (retry = 0; retry < LIBRPMA_FIO_RETRY_MAX_NO; retry++) {
257                 if ((ret = rpma_conn_req_new(ccd->peer, o->server_ip, port_td,
258                                 cfg, &req))) {
259                         librpma_td_verror(td, ret, "rpma_conn_req_new");
260                         goto err_peer_delete;
261                 }
262
263                 /*
264                  * Connect the connection request
265                  * and obtain the connection object.
266                  */
267                 if ((ret = rpma_conn_req_connect(&req, NULL, &ccd->conn))) {
268                         librpma_td_verror(td, ret, "rpma_conn_req_connect");
269                         goto err_req_delete;
270                 }
271
272                 /* wait for the connection to establish */
273                 if ((ret = rpma_conn_next_event(ccd->conn, &event))) {
274                         librpma_td_verror(td, ret, "rpma_conn_next_event");
275                         goto err_conn_delete;
276                 } else if (event == RPMA_CONN_ESTABLISHED) {
277                         break;
278                 } else if (event == RPMA_CONN_REJECTED) {
279                         (void) rpma_conn_disconnect(ccd->conn);
280                         (void) rpma_conn_delete(&ccd->conn);
281                         if (retry < LIBRPMA_FIO_RETRY_MAX_NO - 1) {
282                                 log_err("Thread [%d]: Retrying (#%i) ...\n",
283                                         td->thread_number, retry + 1);
284                                 sleep(LIBRPMA_FIO_RETRY_DELAY_S);
285                         } else {
286                                 log_err(
287                                         "Thread [%d]: The maximum number of retries exceeded. Closing.\n",
288                                         td->thread_number);
289                         }
290                 } else {
291                         log_err(
292                                 "rpma_conn_next_event returned an unexptected event: (%s != RPMA_CONN_ESTABLISHED)\n",
293                                 rpma_utils_conn_event_2str(event));
294                         goto err_conn_delete;
295                 }
296         }
297
298         if (retry > 0)
299                 log_err("Thread [%d]: Connected after retry #%i\n",
300                         td->thread_number, retry);
301
302         if (ccd->conn == NULL)
303                 goto err_peer_delete;
304
305         /* get the connection's main CQ */
306         if ((ret = rpma_conn_get_cq(ccd->conn, &ccd->cq))) {
307                 librpma_td_verror(td, ret, "rpma_conn_get_cq");
308                 goto err_conn_delete;
309         }
310
311         /* get the connection's private data sent from the server */
312         if ((ret = rpma_conn_get_private_data(ccd->conn, &pdata))) {
313                 librpma_td_verror(td, ret, "rpma_conn_get_private_data");
314                 goto err_conn_delete;
315         }
316
317         /* get the server's workspace representation */
318         ccd->ws = pdata.ptr;
319
320         /* create the server's memory representation */
321         if ((ret = rpma_mr_remote_from_descriptor(&ccd->ws->descriptor[0],
322                         ccd->ws->mr_desc_size, &ccd->server_mr))) {
323                 librpma_td_verror(td, ret, "rpma_mr_remote_from_descriptor");
324                 goto err_conn_delete;
325         }
326
327         /* get the total size of the shared server memory */
328         if ((ret = rpma_mr_remote_get_size(ccd->server_mr, &ccd->ws_size))) {
329                 librpma_td_verror(td, ret, "rpma_mr_remote_get_size");
330                 goto err_conn_delete;
331         }
332
333         /* get flush type of the remote node */
334         if ((ret = rpma_mr_remote_get_flush_type(ccd->server_mr,
335                         &remote_flush_type))) {
336                 librpma_td_verror(td, ret, "rpma_mr_remote_get_flush_type");
337                 goto err_conn_delete;
338         }
339
340         ccd->server_mr_flush_type =
341                 (remote_flush_type & RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT) ?
342                 RPMA_FLUSH_TYPE_PERSISTENT : RPMA_FLUSH_TYPE_VISIBILITY;
343
344         /*
345          * Assure an io_us buffer allocation is page-size-aligned which is required
346          * to register for RDMA. User-provided value is intentionally ignored.
347          */
348         td->o.mem_align = page_size;
349
350         td->io_ops_data = ccd;
351
352         return 0;
353
354 err_conn_delete:
355         (void) rpma_conn_disconnect(ccd->conn);
356         (void) rpma_conn_delete(&ccd->conn);
357
358 err_req_delete:
359         (void) rpma_conn_req_delete(&req);
360
361 err_peer_delete:
362         (void) rpma_peer_delete(&ccd->peer);
363
364 err_free_io_u_queues:
365         free(ccd->io_us_queued);
366         free(ccd->io_us_flight);
367         free(ccd->io_us_completed);
368
369 err_free_ccd:
370         free(ccd);
371
372         return -1;
373 }
374
375 void librpma_fio_client_cleanup(struct thread_data *td)
376 {
377         struct librpma_fio_client_data *ccd = td->io_ops_data;
378         enum rpma_conn_event ev;
379         int ret;
380
381         if (ccd == NULL)
382                 return;
383
384         /* delete the iou's memory registration */
385         if ((ret = rpma_mr_dereg(&ccd->orig_mr)))
386                 librpma_td_verror(td, ret, "rpma_mr_dereg");
387         /* delete the iou's memory registration */
388         if ((ret = rpma_mr_remote_delete(&ccd->server_mr)))
389                 librpma_td_verror(td, ret, "rpma_mr_remote_delete");
390         /* initiate disconnection */
391         if ((ret = rpma_conn_disconnect(ccd->conn)))
392                 librpma_td_verror(td, ret, "rpma_conn_disconnect");
393         /* wait for disconnection to end up */
394         if ((ret = rpma_conn_next_event(ccd->conn, &ev))) {
395                 librpma_td_verror(td, ret, "rpma_conn_next_event");
396         } else if (ev != RPMA_CONN_CLOSED) {
397                 log_err(
398                         "client_cleanup received an unexpected event (%s != RPMA_CONN_CLOSED)\n",
399                         rpma_utils_conn_event_2str(ev));
400         }
401         /* delete the connection */
402         if ((ret = rpma_conn_delete(&ccd->conn)))
403                 librpma_td_verror(td, ret, "rpma_conn_delete");
404         /* delete the peer */
405         if ((ret = rpma_peer_delete(&ccd->peer)))
406                 librpma_td_verror(td, ret, "rpma_peer_delete");
407         /* free the software queues */
408         free(ccd->io_us_queued);
409         free(ccd->io_us_flight);
410         free(ccd->io_us_completed);
411         free(ccd);
412         td->io_ops_data = NULL; /* zero ccd */
413 }
414
415 int librpma_fio_file_nop(struct thread_data *td, struct fio_file *f)
416 {
417         /* NOP */
418         return 0;
419 }
420
421 int librpma_fio_client_post_init(struct thread_data *td)
422 {
423         struct librpma_fio_client_data *ccd =  td->io_ops_data;
424         size_t io_us_size;
425         int ret;
426
427         /*
428          * td->orig_buffer is not aligned. The engine requires aligned io_us
429          * so FIO alignes up the address using the formula below.
430          */
431         ccd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
432                         td->o.mem_align;
433
434         /*
435          * td->orig_buffer_size beside the space really consumed by io_us
436          * has paddings which can be omitted for the memory registration.
437          */
438         io_us_size = (unsigned long long)td_max_bs(td) *
439                         (unsigned long long)td->o.iodepth;
440
441         if ((ret = rpma_mr_reg(ccd->peer, ccd->orig_buffer_aligned, io_us_size,
442                         RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
443                         RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
444                         RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT, &ccd->orig_mr)))
445                 librpma_td_verror(td, ret, "rpma_mr_reg");
446         return ret;
447 }
448
449 int librpma_fio_client_get_file_size(struct thread_data *td,
450                 struct fio_file *f)
451 {
452         struct librpma_fio_client_data *ccd = td->io_ops_data;
453
454         f->real_file_size = ccd->ws_size;
455         fio_file_set_size_known(f);
456
457         return 0;
458 }
459
460 static enum fio_q_status client_queue_sync(struct thread_data *td,
461                 struct io_u *io_u)
462 {
463         struct librpma_fio_client_data *ccd = td->io_ops_data;
464         struct ibv_wc wc;
465         unsigned io_u_index;
466         int ret;
467
468         /* execute io_u */
469         if (io_u->ddir == DDIR_READ) {
470                 /* post an RDMA read operation */
471                 if (librpma_fio_client_io_read(td, io_u,
472                                 RPMA_F_COMPLETION_ALWAYS))
473                         goto err;
474         } else if (io_u->ddir == DDIR_WRITE) {
475                 /* post an RDMA write operation */
476                 if (librpma_fio_client_io_write(td, io_u))
477                         goto err;
478                 if (ccd->flush(td, io_u, io_u, io_u->xfer_buflen))
479                         goto err;
480         } else {
481                 log_err("unsupported IO mode: %s\n", io_ddir_name(io_u->ddir));
482                 goto err;
483         }
484
485         do {
486                 /* get a completion */
487                 ret = rpma_cq_get_wc(ccd->cq, 1, &wc, NULL);
488                 if (ret == RPMA_E_NO_COMPLETION) {
489                         /* lack of completion is not an error */
490                         continue;
491                 } else if (ret != 0) {
492                         /* an error occurred */
493                         librpma_td_verror(td, ret, "rpma_cq_get_wc");
494                         goto err;
495                 }
496
497                 /* if io_us has completed with an error */
498                 if (wc.status != IBV_WC_SUCCESS)
499                         goto err;
500
501                 if (wc.opcode == IBV_WC_SEND)
502                         ++ccd->op_send_completed;
503                 else {
504                         if (wc.opcode == IBV_WC_RECV)
505                                 ++ccd->op_recv_completed;
506
507                         break;
508                 }
509         } while (1);
510
511         if (ccd->get_io_u_index(&wc, &io_u_index) != 1)
512                 goto err;
513
514         if (io_u->index != io_u_index) {
515                 log_err(
516                         "no matching io_u for received completion found (io_u_index=%u)\n",
517                         io_u_index);
518                 goto err;
519         }
520
521         /* make sure all SENDs are completed before exit - clean up SQ */
522         if (librpma_fio_client_io_complete_all_sends(td))
523                 goto err;
524
525         return FIO_Q_COMPLETED;
526
527 err:
528         io_u->error = -1;
529         return FIO_Q_COMPLETED;
530 }
531
532 enum fio_q_status librpma_fio_client_queue(struct thread_data *td,
533                 struct io_u *io_u)
534 {
535         struct librpma_fio_client_data *ccd = td->io_ops_data;
536
537         if (ccd->io_u_queued_nr == (int)td->o.iodepth)
538                 return FIO_Q_BUSY;
539
540         if (td->o.sync_io)
541                 return client_queue_sync(td, io_u);
542
543         /* io_u -> queued[] */
544         ccd->io_us_queued[ccd->io_u_queued_nr] = io_u;
545         ccd->io_u_queued_nr++;
546
547         return FIO_Q_QUEUED;
548 }
549
550 int librpma_fio_client_commit(struct thread_data *td)
551 {
552         struct librpma_fio_client_data *ccd = td->io_ops_data;
553         int flags = RPMA_F_COMPLETION_ON_ERROR;
554         struct timespec now;
555         bool fill_time;
556         int i;
557         struct io_u *flush_first_io_u = NULL;
558         unsigned long long int flush_len = 0;
559
560         if (!ccd->io_us_queued)
561                 return -1;
562
563         /* execute all io_us from queued[] */
564         for (i = 0; i < ccd->io_u_queued_nr; i++) {
565                 struct io_u *io_u = ccd->io_us_queued[i];
566
567                 if (io_u->ddir == DDIR_READ) {
568                         if (i + 1 == ccd->io_u_queued_nr ||
569                             ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE)
570                                 flags = RPMA_F_COMPLETION_ALWAYS;
571                         /* post an RDMA read operation */
572                         if (librpma_fio_client_io_read(td, io_u, flags))
573                                 return -1;
574                 } else if (io_u->ddir == DDIR_WRITE) {
575                         /* post an RDMA write operation */
576                         if (librpma_fio_client_io_write(td, io_u))
577                                 return -1;
578
579                         /* cache the first io_u in the sequence */
580                         if (flush_first_io_u == NULL)
581                                 flush_first_io_u = io_u;
582
583                         /*
584                          * the flush length is the sum of all io_u's creating
585                          * the sequence
586                          */
587                         flush_len += io_u->xfer_buflen;
588
589                         /*
590                          * if io_u's are random the rpma_flush is required
591                          * after each one of them
592                          */
593                         if (!td_random(td)) {
594                                 /*
595                                  * When the io_u's are sequential and
596                                  * the current io_u is not the last one and
597                                  * the next one is also a write operation
598                                  * the flush can be postponed by one io_u and
599                                  * cover all of them which build a continuous
600                                  * sequence.
601                                  */
602                                 if ((i + 1 < ccd->io_u_queued_nr) &&
603                                     (ccd->io_us_queued[i + 1]->ddir == DDIR_WRITE))
604                                         continue;
605                         }
606
607                         /* flush all writes which build a continuous sequence */
608                         if (ccd->flush(td, flush_first_io_u, io_u, flush_len))
609                                 return -1;
610
611                         /*
612                          * reset the flush parameters in preparation for
613                          * the next one
614                          */
615                         flush_first_io_u = NULL;
616                         flush_len = 0;
617                 } else {
618                         log_err("unsupported IO mode: %s\n",
619                                 io_ddir_name(io_u->ddir));
620                         return -1;
621                 }
622         }
623
624         if ((fill_time = fio_fill_issue_time(td)))
625                 fio_gettime(&now, NULL);
626
627         /* move executed io_us from queued[] to flight[] */
628         for (i = 0; i < ccd->io_u_queued_nr; i++) {
629                 struct io_u *io_u = ccd->io_us_queued[i];
630
631                 /* FIO does not do this if the engine is asynchronous */
632                 if (fill_time)
633                         memcpy(&io_u->issue_time, &now, sizeof(now));
634
635                 /* move executed io_us from queued[] to flight[] */
636                 ccd->io_us_flight[ccd->io_u_flight_nr] = io_u;
637                 ccd->io_u_flight_nr++;
638
639                 /*
640                  * FIO says:
641                  * If an engine has the commit hook
642                  * it has to call io_u_queued() itself.
643                  */
644                 io_u_queued(td, io_u);
645         }
646
647         /* FIO does not do this if an engine has the commit hook. */
648         io_u_mark_submit(td, ccd->io_u_queued_nr);
649         ccd->io_u_queued_nr = 0;
650
651         return 0;
652 }
653
654 /*
655  * RETURN VALUE
656  * - > 0  - a number of completed io_us
657  * -   0  - when no complicitions received
658  * - (-1) - when an error occurred
659  */
660 static int client_getevent_process(struct thread_data *td)
661 {
662         struct librpma_fio_client_data *ccd = td->io_ops_data;
663         struct ibv_wc wc;
664         /* io_u->index of completed io_u (wc.wr_id) */
665         unsigned int io_u_index;
666         /* # of completed io_us */
667         int cmpl_num = 0;
668         /* helpers */
669         struct io_u *io_u;
670         int i;
671         int ret;
672
673         /* get a completion */
674         if ((ret = rpma_cq_get_wc(ccd->cq, 1, &wc, NULL))) {
675                 /* lack of completion is not an error */
676                 if (ret == RPMA_E_NO_COMPLETION) {
677                         /* lack of completion is not an error */
678                         return 0;
679                 }
680
681                 /* an error occurred */
682                 librpma_td_verror(td, ret, "rpma_cq_get_wc");
683                 return -1;
684         }
685
686         /* if io_us has completed with an error */
687         if (wc.status != IBV_WC_SUCCESS) {
688                 td->error = wc.status;
689                 return -1;
690         }
691
692         if (wc.opcode == IBV_WC_SEND)
693                 ++ccd->op_send_completed;
694         else if (wc.opcode == IBV_WC_RECV)
695                 ++ccd->op_recv_completed;
696
697         if ((ret = ccd->get_io_u_index(&wc, &io_u_index)) != 1)
698                 return ret;
699
700         /* look for an io_u being completed */
701         for (i = 0; i < ccd->io_u_flight_nr; ++i) {
702                 if (ccd->io_us_flight[i]->index == io_u_index) {
703                         cmpl_num = i + 1;
704                         break;
705                 }
706         }
707
708         /* if no matching io_u has been found */
709         if (cmpl_num == 0) {
710                 log_err(
711                         "no matching io_u for received completion found (io_u_index=%u)\n",
712                         io_u_index);
713                 return -1;
714         }
715
716         /* move completed io_us to the completed in-memory queue */
717         for (i = 0; i < cmpl_num; ++i) {
718                 /* get and prepare io_u */
719                 io_u = ccd->io_us_flight[i];
720
721                 /* append to the queue */
722                 ccd->io_us_completed[ccd->io_u_completed_nr] = io_u;
723                 ccd->io_u_completed_nr++;
724         }
725
726         /* remove completed io_us from the flight queue */
727         for (i = cmpl_num; i < ccd->io_u_flight_nr; ++i)
728                 ccd->io_us_flight[i - cmpl_num] = ccd->io_us_flight[i];
729         ccd->io_u_flight_nr -= cmpl_num;
730
731         return cmpl_num;
732 }
733
734 int librpma_fio_client_getevents(struct thread_data *td, unsigned int min,
735                 unsigned int max, const struct timespec *t)
736 {
737         struct librpma_fio_client_data *ccd = td->io_ops_data;
738         /* total # of completed io_us */
739         int cmpl_num_total = 0;
740         /* # of completed io_us from a single event */
741         int cmpl_num;
742
743         do {
744                 cmpl_num = client_getevent_process(td);
745                 if (cmpl_num > 0) {
746                         /* new completions collected */
747                         cmpl_num_total += cmpl_num;
748                 } else if (cmpl_num == 0) {
749                         /*
750                          * It is required to make sure that CQEs for SENDs
751                          * will flow at least at the same pace as CQEs for RECVs.
752                          */
753                         if (cmpl_num_total >= min &&
754                             ccd->op_send_completed >= ccd->op_recv_completed)
755                                 break;
756
757                         /*
758                          * To reduce CPU consumption one can use
759                          * the rpma_cq_wait() function.
760                          * Note this greatly increase the latency
761                          * and make the results less stable.
762                          * The bandwidth stays more or less the same.
763                          */
764                 } else {
765                         /* an error occurred */
766                         return -1;
767                 }
768
769                 /*
770                  * The expected max can be exceeded if CQEs for RECVs will come up
771                  * faster than CQEs for SENDs. But it is required to make sure CQEs for
772                  * SENDs will flow at least at the same pace as CQEs for RECVs.
773                  */
774         } while (cmpl_num_total < max ||
775                         ccd->op_send_completed < ccd->op_recv_completed);
776
777         /*
778          * All posted SENDs are completed and RECVs for them (responses) are
779          * completed. This is the initial situation so the counters are reset.
780          */
781         if (ccd->op_send_posted == ccd->op_send_completed &&
782                         ccd->op_send_completed == ccd->op_recv_completed) {
783                 ccd->op_send_posted = 0;
784                 ccd->op_send_completed = 0;
785                 ccd->op_recv_completed = 0;
786         }
787
788         return cmpl_num_total;
789 }
790
791 struct io_u *librpma_fio_client_event(struct thread_data *td, int event)
792 {
793         struct librpma_fio_client_data *ccd = td->io_ops_data;
794         struct io_u *io_u;
795         int i;
796
797         /* get the first io_u from the queue */
798         io_u = ccd->io_us_completed[0];
799
800         /* remove the first io_u from the queue */
801         for (i = 1; i < ccd->io_u_completed_nr; ++i)
802                 ccd->io_us_completed[i - 1] = ccd->io_us_completed[i];
803         ccd->io_u_completed_nr--;
804
805         dprint_io_u(io_u, "client_event");
806
807         return io_u;
808 }
809
810 char *librpma_fio_client_errdetails(struct io_u *io_u)
811 {
812         /* get the string representation of an error */
813         enum ibv_wc_status status = io_u->error;
814         const char *status_str = ibv_wc_status_str(status);
815
816         char *details = strdup(status_str);
817         if (details == NULL) {
818                 fprintf(stderr, "Error: %s\n", status_str);
819                 fprintf(stderr, "Fatal error: out of memory. Aborting.\n");
820                 abort();
821         }
822
823         /* FIO frees the returned string when it becomes obsolete */
824         return details;
825 }
826
827 int librpma_fio_server_init(struct thread_data *td)
828 {
829         struct librpma_fio_options_values *o = td->eo;
830         struct librpma_fio_server_data *csd;
831         struct ibv_context *dev = NULL;
832         enum rpma_log_level log_level_aux = RPMA_LOG_LEVEL_WARNING;
833         int ret = -1;
834
835         /* --debug=net sets RPMA_LOG_THRESHOLD_AUX to RPMA_LOG_LEVEL_INFO */
836 #ifdef FIO_INC_DEBUG
837         if ((1UL << FD_NET) & fio_debug)
838                 log_level_aux = RPMA_LOG_LEVEL_INFO;
839 #endif
840
841         /* configure logging thresholds to see more details */
842         rpma_log_set_threshold(RPMA_LOG_THRESHOLD, RPMA_LOG_LEVEL_INFO);
843         rpma_log_set_threshold(RPMA_LOG_THRESHOLD_AUX, log_level_aux);
844
845
846         /* obtain an IBV context for a remote IP address */
847         if ((ret = rpma_utils_get_ibv_context(o->server_ip,
848                         RPMA_UTIL_IBV_CONTEXT_LOCAL, &dev))) {
849                 librpma_td_verror(td, ret, "rpma_utils_get_ibv_context");
850                 return -1;
851         }
852
853         /* allocate server's data */
854         csd = calloc(1, sizeof(*csd));
855         if (csd == NULL) {
856                 td_verror(td, errno, "calloc");
857                 return -1;
858         }
859
860         /* create a new peer object */
861         if ((ret = rpma_peer_new(dev, &csd->peer))) {
862                 librpma_td_verror(td, ret, "rpma_peer_new");
863                 goto err_free_csd;
864         }
865
866         td->io_ops_data = csd;
867
868         return 0;
869
870 err_free_csd:
871         free(csd);
872
873         return -1;
874 }
875
876 void librpma_fio_server_cleanup(struct thread_data *td)
877 {
878         struct librpma_fio_server_data *csd =  td->io_ops_data;
879         int ret;
880
881         if (csd == NULL)
882                 return;
883
884         /* free the peer */
885         if ((ret = rpma_peer_delete(&csd->peer)))
886                 librpma_td_verror(td, ret, "rpma_peer_delete");
887
888         free(csd);
889 }
890
891 int librpma_fio_server_open_file(struct thread_data *td, struct fio_file *f,
892                 struct rpma_conn_cfg *cfg)
893 {
894         struct librpma_fio_server_data *csd = td->io_ops_data;
895         struct librpma_fio_options_values *o = td->eo;
896         enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
897         struct librpma_fio_workspace ws = {0};
898         struct rpma_conn_private_data pdata;
899         uint32_t max_msg_num;
900         struct rpma_conn_req *conn_req;
901         struct rpma_conn *conn;
902         struct rpma_mr_local *mr;
903         char port_td[LIBRPMA_FIO_PORT_STR_LEN_MAX];
904         struct rpma_ep *ep;
905         size_t mem_size = td->o.size;
906         size_t mr_desc_size;
907         void *ws_ptr;
908         bool is_dram;
909         int usage_mem_type;
910         int ret;
911
912         if (!f->file_name) {
913                 log_err("fio: filename is not set\n");
914                 return -1;
915         }
916
917         /* start a listening endpoint at addr:port */
918         if (librpma_fio_td_port(o->port, td, port_td))
919                 return -1;
920
921         if ((ret = rpma_ep_listen(csd->peer, o->server_ip, port_td, &ep))) {
922                 librpma_td_verror(td, ret, "rpma_ep_listen");
923                 return -1;
924         }
925
926         is_dram = !strcmp(f->file_name, "malloc");
927         if (is_dram) {
928                 /* allocation from DRAM using posix_memalign() */
929                 ws_ptr = librpma_fio_allocate_dram(td, mem_size, &csd->mem);
930                 usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_VISIBILITY;
931         } else {
932                 /* allocation from PMEM using pmem_map_file() */
933                 ws_ptr = librpma_fio_allocate_pmem(td, f, mem_size, &csd->mem);
934                 usage_mem_type = RPMA_MR_USAGE_FLUSH_TYPE_PERSISTENT;
935         }
936
937         if (ws_ptr == NULL)
938                 goto err_ep_shutdown;
939
940         f->real_file_size = mem_size;
941
942         if ((ret = rpma_mr_reg(csd->peer, ws_ptr, mem_size,
943                         RPMA_MR_USAGE_READ_DST | RPMA_MR_USAGE_READ_SRC |
944                         RPMA_MR_USAGE_WRITE_DST | RPMA_MR_USAGE_WRITE_SRC |
945                         usage_mem_type, &mr))) {
946                 librpma_td_verror(td, ret, "rpma_mr_reg");
947                 goto err_free;
948         }
949
950         if (!is_dram && f->filetype == FIO_TYPE_FILE) {
951                 ret = rpma_mr_advise(mr, 0, mem_size,
952                                 IBV_ADVISE_MR_ADVICE_PREFETCH_WRITE,
953                                 IBV_ADVISE_MR_FLAG_FLUSH);
954                 if (ret) {
955                         librpma_td_verror(td, ret, "rpma_mr_advise");
956                         /* an invalid argument is an error */
957                         if (ret == RPMA_E_INVAL)
958                                 goto err_mr_dereg;
959
960                         /* log_err used instead of log_info to avoid corruption of the JSON output */
961                         log_err("Note: having rpma_mr_advise(3) failed because of RPMA_E_NOSUPP or RPMA_E_PROVIDER may come with a performance penalty, but it is not a blocker for running the benchmark.\n");
962                 }
963         }
964
965         /* get size of the memory region's descriptor */
966         if ((ret = rpma_mr_get_descriptor_size(mr, &mr_desc_size))) {
967                 librpma_td_verror(td, ret, "rpma_mr_get_descriptor_size");
968                 goto err_mr_dereg;
969         }
970
971         /* verify size of the memory region's descriptor */
972         if (mr_desc_size > LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE) {
973                 log_err(
974                         "size of the memory region's descriptor is too big (max=%i)\n",
975                         LIBRPMA_FIO_DESCRIPTOR_MAX_SIZE);
976                 goto err_mr_dereg;
977         }
978
979         /* get the memory region's descriptor */
980         if ((ret = rpma_mr_get_descriptor(mr, &ws.descriptor[0]))) {
981                 librpma_td_verror(td, ret, "rpma_mr_get_descriptor");
982                 goto err_mr_dereg;
983         }
984
985         if (cfg != NULL) {
986                 if ((ret = rpma_conn_cfg_get_rq_size(cfg, &max_msg_num))) {
987                         librpma_td_verror(td, ret, "rpma_conn_cfg_get_rq_size");
988                         goto err_mr_dereg;
989                 }
990
991                 /* verify whether iodepth fits into uint16_t */
992                 if (max_msg_num > UINT16_MAX) {
993                         log_err("fio: iodepth too big (%u > %u)\n",
994                                 max_msg_num, UINT16_MAX);
995                         return -1;
996                 }
997
998                 ws.max_msg_num = max_msg_num;
999         }
1000
1001         /* prepare a workspace description */
1002         ws.direct_write_to_pmem = o->direct_write_to_pmem;
1003         ws.mr_desc_size = mr_desc_size;
1004         pdata.ptr = &ws;
1005         pdata.len = sizeof(ws);
1006
1007         /* receive an incoming connection request */
1008         if ((ret = rpma_ep_next_conn_req(ep, cfg, &conn_req))) {
1009                 librpma_td_verror(td, ret, "rpma_ep_next_conn_req");
1010                 goto err_mr_dereg;
1011         }
1012
1013         if (csd->prepare_connection && csd->prepare_connection(td, conn_req))
1014                 goto err_req_delete;
1015
1016         /* accept the connection request and obtain the connection object */
1017         if ((ret = rpma_conn_req_connect(&conn_req, &pdata, &conn))) {
1018                 librpma_td_verror(td, ret, "rpma_conn_req_connect");
1019                 goto err_req_delete;
1020         }
1021
1022         /* wait for the connection to be established */
1023         if ((ret = rpma_conn_next_event(conn, &conn_event))) {
1024                 librpma_td_verror(td, ret, "rpma_conn_next_event");
1025                 goto err_conn_delete;
1026         } else if (conn_event != RPMA_CONN_ESTABLISHED) {
1027                 log_err("rpma_conn_next_event returned an unexptected event\n");
1028                 goto err_conn_delete;
1029         }
1030
1031         /* end-point is no longer needed */
1032         (void) rpma_ep_shutdown(&ep);
1033
1034         csd->ws_mr = mr;
1035         csd->ws_ptr = ws_ptr;
1036         csd->conn = conn;
1037
1038         /* get the connection's main CQ */
1039         if ((ret = rpma_conn_get_cq(csd->conn, &csd->cq))) {
1040                 librpma_td_verror(td, ret, "rpma_conn_get_cq");
1041                 goto err_conn_delete;
1042         }
1043
1044         return 0;
1045
1046 err_conn_delete:
1047         (void) rpma_conn_delete(&conn);
1048
1049 err_req_delete:
1050         (void) rpma_conn_req_delete(&conn_req);
1051
1052 err_mr_dereg:
1053         (void) rpma_mr_dereg(&mr);
1054
1055 err_free:
1056         librpma_fio_free(&csd->mem);
1057
1058 err_ep_shutdown:
1059         (void) rpma_ep_shutdown(&ep);
1060
1061         return -1;
1062 }
1063
1064 int librpma_fio_server_close_file(struct thread_data *td, struct fio_file *f)
1065 {
1066         struct librpma_fio_server_data *csd = td->io_ops_data;
1067         enum rpma_conn_event conn_event = RPMA_CONN_UNDEFINED;
1068         int rv = 0;
1069         int ret;
1070
1071         /* wait for the connection to be closed */
1072         ret = rpma_conn_next_event(csd->conn, &conn_event);
1073         if (!ret && conn_event != RPMA_CONN_CLOSED) {
1074                 log_err("rpma_conn_next_event returned an unexptected event\n");
1075                 rv = -1;
1076         }
1077
1078         if ((ret = rpma_conn_disconnect(csd->conn))) {
1079                 librpma_td_verror(td, ret, "rpma_conn_disconnect");
1080                 rv = -1;
1081         }
1082
1083         if ((ret = rpma_conn_delete(&csd->conn))) {
1084                 librpma_td_verror(td, ret, "rpma_conn_delete");
1085                 rv = -1;
1086         }
1087
1088         if ((ret = rpma_mr_dereg(&csd->ws_mr))) {
1089                 librpma_td_verror(td, ret, "rpma_mr_dereg");
1090                 rv = -1;
1091         }
1092
1093         librpma_fio_free(&csd->mem);
1094
1095         return rv;
1096 }