t/nvmept_trim: increase transfer size for some tests
[fio.git] / engines / librpma_gpspm.c
1 /*
2  * librpma_gpspm: IO engine that uses PMDK librpma to write data,
3  *              based on General Purpose Server Persistency Method
4  *
5  * Copyright 2020-2022, Intel Corporation
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License,
9  * version 2 as published by the Free Software Foundation..
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  */
16
17 #include "librpma_fio.h"
18
19 #ifdef CONFIG_LIBPMEM2_INSTALLED
20 #include <libpmem2.h>
21 #else
22 #include <libpmem.h>
23 #endif
24
25 /* Generated by the protocol buffer compiler from: librpma_gpspm_flush.proto */
26 #include "librpma_gpspm_flush.pb-c.h"
27
28 #define MAX_MSG_SIZE (512)
29 #define IO_U_BUF_LEN (2 * MAX_MSG_SIZE)
30 #define SEND_OFFSET (0)
31 #define RECV_OFFSET (SEND_OFFSET + MAX_MSG_SIZE)
32
33 #define GPSPM_FLUSH_REQUEST__LAST \
34         { PROTOBUF_C_MESSAGE_INIT(&gpspm_flush_request__descriptor), 0, 0, 0 }
35
36 /*
37  * 'Flush_req_last' is the last flush request
38  * the client has to send to server to indicate
39  * that the client is done.
40  */
41 static const GPSPMFlushRequest Flush_req_last = GPSPM_FLUSH_REQUEST__LAST;
42
43 #define IS_NOT_THE_LAST_MESSAGE(flush_req) \
44         (flush_req->length != Flush_req_last.length || \
45         flush_req->offset != Flush_req_last.offset)
46
47 /* client side implementation */
48
49 /* get next io_u message buffer in the round-robin fashion */
50 #define IO_U_NEXT_BUF_OFF_CLIENT(cd) \
51         (IO_U_BUF_LEN * ((cd->msg_curr++) % cd->msg_num))
52
53 struct client_data {
54         /* memory for sending and receiving buffered */
55         char *io_us_msgs;
56
57         /* resources for messaging buffer */
58         uint32_t msg_num;
59         uint32_t msg_curr;
60         struct rpma_mr_local *msg_mr;
61 };
62
63 static inline int client_io_flush(struct thread_data *td,
64                 struct io_u *first_io_u, struct io_u *last_io_u,
65                 unsigned long long int len);
66
67 static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index);
68
69 static int client_init(struct thread_data *td)
70 {
71         struct librpma_fio_client_data *ccd;
72         struct client_data *cd;
73         uint32_t write_num;
74         struct rpma_conn_cfg *cfg = NULL;
75         int ret;
76
77         /*
78          * not supported:
79          * - readwrite = read / trim / randread / randtrim /
80          *               / rw / randrw / trimwrite
81          */
82         if (td_read(td) || td_trim(td)) {
83                 td_verror(td, EINVAL, "Not supported mode.");
84                 return -1;
85         }
86
87         /* allocate client's data */
88         cd = calloc(1, sizeof(*cd));
89         if (cd == NULL) {
90                 td_verror(td, errno, "calloc");
91                 return -1;
92         }
93
94         /*
95          * Calculate the required number of WRITEs and FLUSHes.
96          *
97          * Note: Each flush is a request (SEND) and response (RECV) pair.
98          */
99         if (td_random(td)) {
100                 write_num = td->o.iodepth; /* WRITE * N */
101                 cd->msg_num = td->o.iodepth; /* FLUSH * N */
102         } else {
103                 if (td->o.sync_io) {
104                         write_num = 1; /* WRITE */
105                         cd->msg_num = 1; /* FLUSH */
106                 } else {
107                         write_num = td->o.iodepth; /* WRITE * N */
108                         /*
109                          * FLUSH * B where:
110                          * - B == ceil(iodepth / iodepth_batch)
111                          *   which is the number of batches for N writes
112                          */
113                         cd->msg_num = LIBRPMA_FIO_CEIL(td->o.iodepth,
114                                         td->o.iodepth_batch);
115                 }
116         }
117
118         /* create a connection configuration object */
119         if ((ret = rpma_conn_cfg_new(&cfg))) {
120                 librpma_td_verror(td, ret, "rpma_conn_cfg_new");
121                 goto err_free_cd;
122         }
123
124         /*
125          * Calculate the required queue sizes where:
126          * - the send queue (SQ) has to be big enough to accommodate
127          *   all io_us (WRITEs) and all flush requests (SENDs)
128          * - the receive queue (RQ) has to be big enough to accommodate
129          *   all flush responses (RECVs)
130          * - the completion queue (CQ) has to be big enough to accommodate all
131          *   success and error completions (sq_size + rq_size)
132          */
133         if ((ret = rpma_conn_cfg_set_sq_size(cfg, write_num + cd->msg_num))) {
134                 librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
135                 goto err_cfg_delete;
136         }
137         if ((ret = rpma_conn_cfg_set_rq_size(cfg, cd->msg_num))) {
138                 librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size");
139                 goto err_cfg_delete;
140         }
141         if ((ret = rpma_conn_cfg_set_cq_size(cfg, write_num + cd->msg_num * 2))) {
142                 librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
143                 goto err_cfg_delete;
144         }
145
146         if (librpma_fio_client_init(td, cfg))
147                 goto err_cfg_delete;
148
149         ccd = td->io_ops_data;
150
151         if (ccd->ws->direct_write_to_pmem &&
152             ccd->server_mr_flush_type == RPMA_FLUSH_TYPE_PERSISTENT &&
153             td->thread_number == 1) {
154                 /* XXX log_info mixes with the JSON output */
155                 log_err(
156                         "Note: The server side supports Direct Write to PMem and it is equipped with PMem (direct_write_to_pmem).\n"
157                         "You can use librpma_client and librpma_server engines for better performance instead of GPSPM.\n");
158         }
159
160         /* validate the server's RQ capacity */
161         if (cd->msg_num > ccd->ws->max_msg_num) {
162                 log_err(
163                         "server's RQ size (iodepth) too small to handle the client's workspace requirements (%u < %u)\n",
164                         ccd->ws->max_msg_num, cd->msg_num);
165                 goto err_cleanup_common;
166         }
167
168         if ((ret = rpma_conn_cfg_delete(&cfg))) {
169                 librpma_td_verror(td, ret, "rpma_conn_cfg_delete");
170                 /* non fatal error - continue */
171         }
172
173         ccd->flush = client_io_flush;
174         ccd->get_io_u_index = client_get_io_u_index;
175         ccd->client_data = cd;
176
177         return 0;
178
179 err_cleanup_common:
180         librpma_fio_client_cleanup(td);
181
182 err_cfg_delete:
183         (void) rpma_conn_cfg_delete(&cfg);
184
185 err_free_cd:
186         free(cd);
187
188         return -1;
189 }
190
191 static int client_post_init(struct thread_data *td)
192 {
193         struct librpma_fio_client_data *ccd = td->io_ops_data;
194         struct client_data *cd = ccd->client_data;
195         unsigned int io_us_msgs_size;
196         int ret;
197
198         /* message buffers initialization and registration */
199         io_us_msgs_size = cd->msg_num * IO_U_BUF_LEN;
200         if ((ret = posix_memalign((void **)&cd->io_us_msgs, page_size,
201                         io_us_msgs_size))) {
202                 td_verror(td, ret, "posix_memalign");
203                 return ret;
204         }
205         if ((ret = rpma_mr_reg(ccd->peer, cd->io_us_msgs, io_us_msgs_size,
206                         RPMA_MR_USAGE_SEND | RPMA_MR_USAGE_RECV,
207                         &cd->msg_mr))) {
208                 librpma_td_verror(td, ret, "rpma_mr_reg");
209                 return ret;
210         }
211
212         return librpma_fio_client_post_init(td);
213 }
214
215 static void client_cleanup(struct thread_data *td)
216 {
217         struct librpma_fio_client_data *ccd = td->io_ops_data;
218         struct client_data *cd;
219         size_t flush_req_size;
220         size_t io_u_buf_off;
221         size_t send_offset;
222         void *send_ptr;
223         int ret;
224
225         if (ccd == NULL)
226                 return;
227
228         cd = ccd->client_data;
229         if (cd == NULL) {
230                 librpma_fio_client_cleanup(td);
231                 return;
232         }
233
234         /*
235          * Make sure all SEND completions are collected ergo there are free
236          * slots in the SQ for the last SEND message.
237          *
238          * Note: If any operation will fail we still can send the termination
239          * notice.
240          */
241         (void) librpma_fio_client_io_complete_all_sends(td);
242
243         /* prepare the last flush message and pack it to the send buffer */
244         flush_req_size = gpspm_flush_request__get_packed_size(&Flush_req_last);
245         if (flush_req_size > MAX_MSG_SIZE) {
246                 log_err(
247                         "Packed flush request size is bigger than available send buffer space (%zu > %d\n",
248                         flush_req_size, MAX_MSG_SIZE);
249         } else {
250                 io_u_buf_off = IO_U_NEXT_BUF_OFF_CLIENT(cd);
251                 send_offset = io_u_buf_off + SEND_OFFSET;
252                 send_ptr = cd->io_us_msgs + send_offset;
253                 (void) gpspm_flush_request__pack(&Flush_req_last, send_ptr);
254
255                 /* send the flush message */
256                 if ((ret = rpma_send(ccd->conn, cd->msg_mr, send_offset,
257                                 flush_req_size, RPMA_F_COMPLETION_ALWAYS,
258                                 NULL)))
259                         librpma_td_verror(td, ret, "rpma_send");
260
261                 ++ccd->op_send_posted;
262
263                 /* Wait for the SEND to complete */
264                 (void) librpma_fio_client_io_complete_all_sends(td);
265         }
266
267         /* deregister the messaging buffer memory */
268         if ((ret = rpma_mr_dereg(&cd->msg_mr)))
269                 librpma_td_verror(td, ret, "rpma_mr_dereg");
270
271         free(ccd->client_data);
272
273         librpma_fio_client_cleanup(td);
274 }
275
276 static inline int client_io_flush(struct thread_data *td,
277                 struct io_u *first_io_u, struct io_u *last_io_u,
278                 unsigned long long int len)
279 {
280         struct librpma_fio_client_data *ccd = td->io_ops_data;
281         struct client_data *cd = ccd->client_data;
282         size_t io_u_buf_off = IO_U_NEXT_BUF_OFF_CLIENT(cd);
283         size_t send_offset = io_u_buf_off + SEND_OFFSET;
284         size_t recv_offset = io_u_buf_off + RECV_OFFSET;
285         void *send_ptr = cd->io_us_msgs + send_offset;
286         void *recv_ptr = cd->io_us_msgs + recv_offset;
287         GPSPMFlushRequest flush_req = GPSPM_FLUSH_REQUEST__INIT;
288         size_t flush_req_size = 0;
289         int ret;
290
291         /* prepare a response buffer */
292         if ((ret = rpma_recv(ccd->conn, cd->msg_mr, recv_offset, MAX_MSG_SIZE,
293                         recv_ptr))) {
294                 librpma_td_verror(td, ret, "rpma_recv");
295                 return -1;
296         }
297
298         /* prepare a flush message and pack it to a send buffer */
299         flush_req.offset = first_io_u->offset;
300         flush_req.length = len;
301         flush_req.op_context = last_io_u->index;
302         flush_req_size = gpspm_flush_request__get_packed_size(&flush_req);
303         if (flush_req_size > MAX_MSG_SIZE) {
304                 log_err(
305                         "Packed flush request size is bigger than available send buffer space (%"
306                         PRIu64 " > %d\n", flush_req_size, MAX_MSG_SIZE);
307                 return -1;
308         }
309         (void) gpspm_flush_request__pack(&flush_req, send_ptr);
310
311         /* send the flush message */
312         if ((ret = rpma_send(ccd->conn, cd->msg_mr, send_offset, flush_req_size,
313                         RPMA_F_COMPLETION_ALWAYS, NULL))) {
314                 librpma_td_verror(td, ret, "rpma_send");
315                 return -1;
316         }
317
318         ++ccd->op_send_posted;
319
320         return 0;
321 }
322
323 static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index)
324 {
325         GPSPMFlushResponse *flush_resp;
326
327         if (wc->opcode != IBV_WC_RECV)
328                 return 0;
329
330         /* unpack a response from the received buffer */
331         flush_resp = gpspm_flush_response__unpack(NULL,
332                         wc->byte_len, (void *)wc->wr_id);
333         if (flush_resp == NULL) {
334                 log_err("Cannot unpack the flush response buffer\n");
335                 return -1;
336         }
337
338         memcpy(io_u_index, &flush_resp->op_context, sizeof(*io_u_index));
339
340         gpspm_flush_response__free_unpacked(flush_resp, NULL);
341
342         return 1;
343 }
344
345 FIO_STATIC struct ioengine_ops ioengine_client = {
346         .name                   = "librpma_gpspm_client",
347         .version                = FIO_IOOPS_VERSION,
348         .init                   = client_init,
349         .post_init              = client_post_init,
350         .get_file_size          = librpma_fio_client_get_file_size,
351         .open_file              = librpma_fio_file_nop,
352         .queue                  = librpma_fio_client_queue,
353         .commit                 = librpma_fio_client_commit,
354         .getevents              = librpma_fio_client_getevents,
355         .event                  = librpma_fio_client_event,
356         .errdetails             = librpma_fio_client_errdetails,
357         .close_file             = librpma_fio_file_nop,
358         .cleanup                = client_cleanup,
359         .flags                  = FIO_DISKLESSIO | FIO_ASYNCIO_SETS_ISSUE_TIME,
360         .options                = librpma_fio_options,
361         .option_struct_size     = sizeof(struct librpma_fio_options_values),
362 };
363
364 /* server side implementation */
365
366 #define IO_U_BUFF_OFF_SERVER(i) (i * IO_U_BUF_LEN)
367
368 typedef void (*librpma_fio_persist_fn)(const void *ptr, size_t size);
369
370 struct server_data {
371         /* aligned td->orig_buffer */
372         char *orig_buffer_aligned;
373
374         /* resources for messaging buffer from DRAM allocated by fio */
375         struct rpma_mr_local *msg_mr;
376
377         uint32_t msg_sqe_available; /* # of free SQ slots */
378
379         /* in-memory queues */
380         struct ibv_wc *msgs_queued;
381         uint32_t msg_queued_nr;
382
383         librpma_fio_persist_fn persist;
384 };
385
386 static int server_init(struct thread_data *td)
387 {
388         struct librpma_fio_server_data *csd;
389         struct server_data *sd;
390         int ret = -1;
391
392         if ((ret = librpma_fio_server_init(td)))
393                 return ret;
394
395         csd = td->io_ops_data;
396
397         /* allocate server's data */
398         sd = calloc(1, sizeof(*sd));
399         if (sd == NULL) {
400                 td_verror(td, errno, "calloc");
401                 goto err_server_cleanup;
402         }
403
404         /* allocate in-memory queue */
405         sd->msgs_queued = calloc(td->o.iodepth, sizeof(*sd->msgs_queued));
406         if (sd->msgs_queued == NULL) {
407                 td_verror(td, errno, "calloc");
408                 goto err_free_sd;
409         }
410
411 #ifdef CONFIG_LIBPMEM2_INSTALLED
412         /* get libpmem2 persist function from pmem2_map */
413         sd->persist = pmem2_get_persist_fn(csd->mem.map);
414 #else
415         sd->persist = pmem_persist;
416 #endif
417
418         /*
419          * Assure a single io_u buffer can store both SEND and RECV messages and
420          * an io_us buffer allocation is page-size-aligned which is required
421          * to register for RDMA. User-provided values are intentionally ignored.
422          */
423         td->o.max_bs[DDIR_READ] = IO_U_BUF_LEN;
424         td->o.mem_align = page_size;
425
426         csd->server_data = sd;
427
428         return 0;
429
430 err_free_sd:
431         free(sd);
432
433 err_server_cleanup:
434         librpma_fio_server_cleanup(td);
435
436         return -1;
437 }
438
439 static int server_post_init(struct thread_data *td)
440 {
441         struct librpma_fio_server_data *csd = td->io_ops_data;
442         struct server_data *sd = csd->server_data;
443         size_t io_us_size;
444         size_t io_u_buflen;
445         int ret;
446
447         /*
448          * td->orig_buffer is not aligned. The engine requires aligned io_us
449          * so FIO aligns up the address using the formula below.
450          */
451         sd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
452                         td->o.mem_align;
453
454         /*
455          * XXX
456          * Each io_u message buffer contains recv and send messages.
457          * Aligning each of those buffers may potentially give
458          * some performance benefits.
459          */
460         io_u_buflen = td_max_bs(td);
461
462         /* check whether io_u buffer is big enough */
463         if (io_u_buflen < IO_U_BUF_LEN) {
464                 log_err(
465                         "blocksize too small to accommodate assumed maximal request/response pair size (%" PRIu64 " < %d)\n",
466                         io_u_buflen, IO_U_BUF_LEN);
467                 return -1;
468         }
469
470         /*
471          * td->orig_buffer_size beside the space really consumed by io_us
472          * has paddings which can be omitted for the memory registration.
473          */
474         io_us_size = (unsigned long long)io_u_buflen *
475                         (unsigned long long)td->o.iodepth;
476
477         if ((ret = rpma_mr_reg(csd->peer, sd->orig_buffer_aligned, io_us_size,
478                         RPMA_MR_USAGE_SEND | RPMA_MR_USAGE_RECV,
479                         &sd->msg_mr))) {
480                 librpma_td_verror(td, ret, "rpma_mr_reg");
481                 return -1;
482         }
483
484         return 0;
485 }
486
487 static void server_cleanup(struct thread_data *td)
488 {
489         struct librpma_fio_server_data *csd = td->io_ops_data;
490         struct server_data *sd;
491         int ret;
492
493         if (csd == NULL)
494                 return;
495
496         sd = csd->server_data;
497
498         if (sd != NULL) {
499                 /* rpma_mr_dereg(messaging buffer from DRAM) */
500                 if ((ret = rpma_mr_dereg(&sd->msg_mr)))
501                         librpma_td_verror(td, ret, "rpma_mr_dereg");
502
503                 free(sd->msgs_queued);
504                 free(sd);
505         }
506
507         librpma_fio_server_cleanup(td);
508 }
509
510 static int prepare_connection(struct thread_data *td,
511                 struct rpma_conn_req *conn_req)
512 {
513         struct librpma_fio_server_data *csd = td->io_ops_data;
514         struct server_data *sd = csd->server_data;
515         int ret;
516         int i;
517
518         /* prepare buffers for a flush requests */
519         sd->msg_sqe_available = td->o.iodepth;
520         for (i = 0; i < td->o.iodepth; i++) {
521                 size_t offset_recv_msg = IO_U_BUFF_OFF_SERVER(i) + RECV_OFFSET;
522                 if ((ret = rpma_conn_req_recv(conn_req, sd->msg_mr,
523                                 offset_recv_msg, MAX_MSG_SIZE,
524                                 (const void *)(uintptr_t)i))) {
525                         librpma_td_verror(td, ret, "rpma_conn_req_recv");
526                         return ret;
527                 }
528         }
529
530         return 0;
531 }
532
533 static int server_open_file(struct thread_data *td, struct fio_file *f)
534 {
535         struct librpma_fio_server_data *csd = td->io_ops_data;
536         struct rpma_conn_cfg *cfg = NULL;
537         uint16_t max_msg_num = td->o.iodepth;
538         int ret;
539
540         csd->prepare_connection = prepare_connection;
541
542         /* create a connection configuration object */
543         if ((ret = rpma_conn_cfg_new(&cfg))) {
544                 librpma_td_verror(td, ret, "rpma_conn_cfg_new");
545                 return -1;
546         }
547
548         /*
549          * Calculate the required queue sizes where:
550          * - the send queue (SQ) has to be big enough to accommodate
551          *   all possible flush requests (SENDs)
552          * - the receive queue (RQ) has to be big enough to accommodate
553          *   all flush responses (RECVs)
554          * - the completion queue (CQ) has to be big enough to accommodate
555          *   all success and error completions (sq_size + rq_size)
556          */
557         if ((ret = rpma_conn_cfg_set_sq_size(cfg, max_msg_num))) {
558                 librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
559                 goto err_cfg_delete;
560         }
561         if ((ret = rpma_conn_cfg_set_rq_size(cfg, max_msg_num))) {
562                 librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size");
563                 goto err_cfg_delete;
564         }
565         if ((ret = rpma_conn_cfg_set_cq_size(cfg, max_msg_num * 2))) {
566                 librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
567                 goto err_cfg_delete;
568         }
569
570         ret = librpma_fio_server_open_file(td, f, cfg);
571
572 err_cfg_delete:
573         (void) rpma_conn_cfg_delete(&cfg);
574
575         return ret;
576 }
577
578 static int server_qe_process(struct thread_data *td, struct ibv_wc *wc)
579 {
580         struct librpma_fio_server_data *csd = td->io_ops_data;
581         struct server_data *sd = csd->server_data;
582         GPSPMFlushRequest *flush_req;
583         GPSPMFlushResponse flush_resp = GPSPM_FLUSH_RESPONSE__INIT;
584         size_t flush_resp_size = 0;
585         size_t send_buff_offset;
586         size_t recv_buff_offset;
587         size_t io_u_buff_offset;
588         void *send_buff_ptr;
589         void *recv_buff_ptr;
590         void *op_ptr;
591         int msg_index;
592         int ret;
593
594         /* calculate SEND/RECV pair parameters */
595         msg_index = (int)(uintptr_t)wc->wr_id;
596         io_u_buff_offset = IO_U_BUFF_OFF_SERVER(msg_index);
597         send_buff_offset = io_u_buff_offset + SEND_OFFSET;
598         recv_buff_offset = io_u_buff_offset + RECV_OFFSET;
599         send_buff_ptr = sd->orig_buffer_aligned + send_buff_offset;
600         recv_buff_ptr = sd->orig_buffer_aligned + recv_buff_offset;
601
602         /* unpack a flush request from the received buffer */
603         flush_req = gpspm_flush_request__unpack(NULL, wc->byte_len,
604                         recv_buff_ptr);
605         if (flush_req == NULL) {
606                 log_err("cannot unpack the flush request buffer\n");
607                 goto err_terminate;
608         }
609
610         if (IS_NOT_THE_LAST_MESSAGE(flush_req)) {
611                 op_ptr = csd->ws_ptr + flush_req->offset;
612                 sd->persist(op_ptr, flush_req->length);
613         } else {
614                 /*
615                  * This is the last message - the client is done.
616                  */
617                 gpspm_flush_request__free_unpacked(flush_req, NULL);
618                 td->done = true;
619                 return 0;
620         }
621
622         /* initiate the next receive operation */
623         if ((ret = rpma_recv(csd->conn, sd->msg_mr, recv_buff_offset,
624                         MAX_MSG_SIZE,
625                         (const void *)(uintptr_t)msg_index))) {
626                 librpma_td_verror(td, ret, "rpma_recv");
627                 goto err_free_unpacked;
628         }
629
630         /* prepare a flush response and pack it to a send buffer */
631         flush_resp.op_context = flush_req->op_context;
632         flush_resp_size = gpspm_flush_response__get_packed_size(&flush_resp);
633         if (flush_resp_size > MAX_MSG_SIZE) {
634                 log_err(
635                         "Size of the packed flush response is bigger than the available space of the send buffer (%"
636                         PRIu64 " > %i\n", flush_resp_size, MAX_MSG_SIZE);
637                 goto err_free_unpacked;
638         }
639
640         (void) gpspm_flush_response__pack(&flush_resp, send_buff_ptr);
641
642         /* send the flush response */
643         if ((ret = rpma_send(csd->conn, sd->msg_mr, send_buff_offset,
644                         flush_resp_size, RPMA_F_COMPLETION_ALWAYS, NULL))) {
645                 librpma_td_verror(td, ret, "rpma_send");
646                 goto err_free_unpacked;
647         }
648         --sd->msg_sqe_available;
649
650         gpspm_flush_request__free_unpacked(flush_req, NULL);
651
652         return 0;
653
654 err_free_unpacked:
655         gpspm_flush_request__free_unpacked(flush_req, NULL);
656
657 err_terminate:
658         td->terminate = true;
659
660         return -1;
661 }
662
663 static inline int server_queue_process(struct thread_data *td)
664 {
665         struct librpma_fio_server_data *csd = td->io_ops_data;
666         struct server_data *sd = csd->server_data;
667         int ret;
668         int i;
669
670         /* min(# of queue entries, # of SQ entries available) */
671         uint32_t qes_to_process = min(sd->msg_queued_nr, sd->msg_sqe_available);
672         if (qes_to_process == 0)
673                 return 0;
674
675         /* process queued completions */
676         for (i = 0; i < qes_to_process; ++i) {
677                 if ((ret = server_qe_process(td, &sd->msgs_queued[i])))
678                         return ret;
679         }
680
681         /* progress the queue */
682         for (i = 0; i < sd->msg_queued_nr - qes_to_process; ++i) {
683                 memcpy(&sd->msgs_queued[i],
684                         &sd->msgs_queued[qes_to_process + i],
685                         sizeof(sd->msgs_queued[i]));
686         }
687
688         sd->msg_queued_nr -= qes_to_process;
689
690         return 0;
691 }
692
693 static int server_cmpl_process(struct thread_data *td)
694 {
695         struct librpma_fio_server_data *csd = td->io_ops_data;
696         struct server_data *sd = csd->server_data;
697         struct ibv_wc *wc = &sd->msgs_queued[sd->msg_queued_nr];
698         struct librpma_fio_options_values *o = td->eo;
699         int ret;
700
701         ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL);
702         if (ret == RPMA_E_NO_COMPLETION) {
703                 if (o->busy_wait_polling)
704                         return 0; /* lack of completion is not an error */
705
706                 ret = rpma_cq_wait(csd->cq);
707                 if (ret == RPMA_E_NO_COMPLETION)
708                         return 0; /* lack of completion is not an error */
709                 if (ret) {
710                         librpma_td_verror(td, ret, "rpma_cq_wait");
711                         goto err_terminate;
712                 }
713
714                 ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL);
715                 if (ret == RPMA_E_NO_COMPLETION)
716                         return 0; /* lack of completion is not an error */
717                 if (ret) {
718                         librpma_td_verror(td, ret, "rpma_cq_get_wc");
719                         goto err_terminate;
720                 }
721         } else if (ret) {
722                 librpma_td_verror(td, ret, "rpma_cq_get_wc");
723                 goto err_terminate;
724         }
725
726         /* validate the completion */
727         if (wc->status != IBV_WC_SUCCESS)
728                 goto err_terminate;
729
730         if (wc->opcode == IBV_WC_RECV)
731                 ++sd->msg_queued_nr;
732         else if (wc->opcode == IBV_WC_SEND)
733                 ++sd->msg_sqe_available;
734
735         return 0;
736
737 err_terminate:
738         td->terminate = true;
739
740         return -1;
741 }
742
743 static enum fio_q_status server_queue(struct thread_data *td, struct io_u *io_u)
744 {
745         do {
746                 if (server_cmpl_process(td))
747                         return FIO_Q_BUSY;
748
749                 if (server_queue_process(td))
750                         return FIO_Q_BUSY;
751
752         } while (!td->done);
753
754         return FIO_Q_COMPLETED;
755 }
756
757 FIO_STATIC struct ioengine_ops ioengine_server = {
758         .name                   = "librpma_gpspm_server",
759         .version                = FIO_IOOPS_VERSION,
760         .init                   = server_init,
761         .post_init              = server_post_init,
762         .open_file              = server_open_file,
763         .close_file             = librpma_fio_server_close_file,
764         .queue                  = server_queue,
765         .invalidate             = librpma_fio_file_nop,
766         .cleanup                = server_cleanup,
767         .flags                  = FIO_SYNCIO,
768         .options                = librpma_fio_options,
769         .option_struct_size     = sizeof(struct librpma_fio_options_values),
770 };
771
772 /* register both engines */
773
774 static void fio_init fio_librpma_gpspm_register(void)
775 {
776         register_ioengine(&ioengine_client);
777         register_ioengine(&ioengine_server);
778 }
779
780 static void fio_exit fio_librpma_gpspm_unregister(void)
781 {
782         unregister_ioengine(&ioengine_client);
783         unregister_ioengine(&ioengine_server);
784 }