t/nvmept_trim: increase transfer size for some tests
[fio.git] / engines / librpma_gpspm.c
1 /*
2  * librpma_gpspm: IO engine that uses PMDK librpma to write data,
3  *              based on General Purpose Server Persistency Method
4  *
5  * Copyright 2020-2021, Intel Corporation
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License,
9  * version 2 as published by the Free Software Foundation..
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  */
16
17 #include "librpma_fio.h"
18
19 #include <libpmem.h>
20
21 /* Generated by the protocol buffer compiler from: librpma_gpspm_flush.proto */
22 #include "librpma_gpspm_flush.pb-c.h"
23
24 #define MAX_MSG_SIZE (512)
25 #define IO_U_BUF_LEN (2 * MAX_MSG_SIZE)
26 #define SEND_OFFSET (0)
27 #define RECV_OFFSET (SEND_OFFSET + MAX_MSG_SIZE)
28
29 #define GPSPM_FLUSH_REQUEST__LAST \
30         { PROTOBUF_C_MESSAGE_INIT(&gpspm_flush_request__descriptor), 0, 0, 0 }
31
32 /*
33  * 'Flush_req_last' is the last flush request
34  * the client has to send to server to indicate
35  * that the client is done.
36  */
37 static const GPSPMFlushRequest Flush_req_last = GPSPM_FLUSH_REQUEST__LAST;
38
39 #define IS_NOT_THE_LAST_MESSAGE(flush_req) \
40         (flush_req->length != Flush_req_last.length || \
41         flush_req->offset != Flush_req_last.offset)
42
43 /* client side implementation */
44
45 /* get next io_u message buffer in the round-robin fashion */
46 #define IO_U_NEXT_BUF_OFF_CLIENT(cd) \
47         (IO_U_BUF_LEN * ((cd->msg_curr++) % cd->msg_num))
48
49 struct client_data {
50         /* memory for sending and receiving buffered */
51         char *io_us_msgs;
52
53         /* resources for messaging buffer */
54         uint32_t msg_num;
55         uint32_t msg_curr;
56         struct rpma_mr_local *msg_mr;
57 };
58
59 static inline int client_io_flush(struct thread_data *td,
60                 struct io_u *first_io_u, struct io_u *last_io_u,
61                 unsigned long long int len);
62
63 static int client_get_io_u_index(struct rpma_completion *cmpl,
64                 unsigned int *io_u_index);
65
66 static int client_init(struct thread_data *td)
67 {
68         struct librpma_fio_client_data *ccd;
69         struct client_data *cd;
70         uint32_t write_num;
71         struct rpma_conn_cfg *cfg = NULL;
72         int ret;
73
74         /*
75          * not supported:
76          * - readwrite = read / trim / randread / randtrim /
77          *               / rw / randrw / trimwrite
78          */
79         if (td_read(td) || td_trim(td)) {
80                 td_verror(td, EINVAL, "Not supported mode.");
81                 return -1;
82         }
83
84         /* allocate client's data */
85         cd = calloc(1, sizeof(*cd));
86         if (cd == NULL) {
87                 td_verror(td, errno, "calloc");
88                 return -1;
89         }
90
91         /*
92          * Calculate the required number of WRITEs and FLUSHes.
93          *
94          * Note: Each flush is a request (SEND) and response (RECV) pair.
95          */
96         if (td_random(td)) {
97                 write_num = td->o.iodepth; /* WRITE * N */
98                 cd->msg_num = td->o.iodepth; /* FLUSH * N */
99         } else {
100                 if (td->o.sync_io) {
101                         write_num = 1; /* WRITE */
102                         cd->msg_num = 1; /* FLUSH */
103                 } else {
104                         write_num = td->o.iodepth; /* WRITE * N */
105                         /*
106                          * FLUSH * B where:
107                          * - B == ceil(iodepth / iodepth_batch)
108                          *   which is the number of batches for N writes
109                          */
110                         cd->msg_num = LIBRPMA_FIO_CEIL(td->o.iodepth,
111                                         td->o.iodepth_batch);
112                 }
113         }
114
115         /* create a connection configuration object */
116         if ((ret = rpma_conn_cfg_new(&cfg))) {
117                 librpma_td_verror(td, ret, "rpma_conn_cfg_new");
118                 goto err_free_cd;
119         }
120
121         /*
122          * Calculate the required queue sizes where:
123          * - the send queue (SQ) has to be big enough to accommodate
124          *   all io_us (WRITEs) and all flush requests (SENDs)
125          * - the receive queue (RQ) has to be big enough to accommodate
126          *   all flush responses (RECVs)
127          * - the completion queue (CQ) has to be big enough to accommodate all
128          *   success and error completions (sq_size + rq_size)
129          */
130         if ((ret = rpma_conn_cfg_set_sq_size(cfg, write_num + cd->msg_num))) {
131                 librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
132                 goto err_cfg_delete;
133         }
134         if ((ret = rpma_conn_cfg_set_rq_size(cfg, cd->msg_num))) {
135                 librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size");
136                 goto err_cfg_delete;
137         }
138         if ((ret = rpma_conn_cfg_set_cq_size(cfg, write_num + cd->msg_num * 2))) {
139                 librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
140                 goto err_cfg_delete;
141         }
142
143         if (librpma_fio_client_init(td, cfg))
144                 goto err_cfg_delete;
145
146         ccd = td->io_ops_data;
147
148         if (ccd->ws->direct_write_to_pmem &&
149             ccd->server_mr_flush_type == RPMA_FLUSH_TYPE_PERSISTENT &&
150             td->thread_number == 1) {
151                 /* XXX log_info mixes with the JSON output */
152                 log_err(
153                         "Note: The server side supports Direct Write to PMem and it is equipped with PMem (direct_write_to_pmem).\n"
154                         "You can use librpma_client and librpma_server engines for better performance instead of GPSPM.\n");
155         }
156
157         /* validate the server's RQ capacity */
158         if (cd->msg_num > ccd->ws->max_msg_num) {
159                 log_err(
160                         "server's RQ size (iodepth) too small to handle the client's workspace requirements (%u < %u)\n",
161                         ccd->ws->max_msg_num, cd->msg_num);
162                 goto err_cleanup_common;
163         }
164
165         if ((ret = rpma_conn_cfg_delete(&cfg))) {
166                 librpma_td_verror(td, ret, "rpma_conn_cfg_delete");
167                 /* non fatal error - continue */
168         }
169
170         ccd->flush = client_io_flush;
171         ccd->get_io_u_index = client_get_io_u_index;
172         ccd->client_data = cd;
173
174         return 0;
175
176 err_cleanup_common:
177         librpma_fio_client_cleanup(td);
178
179 err_cfg_delete:
180         (void) rpma_conn_cfg_delete(&cfg);
181
182 err_free_cd:
183         free(cd);
184
185         return -1;
186 }
187
188 static int client_post_init(struct thread_data *td)
189 {
190         struct librpma_fio_client_data *ccd = td->io_ops_data;
191         struct client_data *cd = ccd->client_data;
192         unsigned int io_us_msgs_size;
193         int ret;
194
195         /* message buffers initialization and registration */
196         io_us_msgs_size = cd->msg_num * IO_U_BUF_LEN;
197         if ((ret = posix_memalign((void **)&cd->io_us_msgs, page_size,
198                         io_us_msgs_size))) {
199                 td_verror(td, ret, "posix_memalign");
200                 return ret;
201         }
202         if ((ret = rpma_mr_reg(ccd->peer, cd->io_us_msgs, io_us_msgs_size,
203                         RPMA_MR_USAGE_SEND | RPMA_MR_USAGE_RECV,
204                         &cd->msg_mr))) {
205                 librpma_td_verror(td, ret, "rpma_mr_reg");
206                 return ret;
207         }
208
209         return librpma_fio_client_post_init(td);
210 }
211
212 static void client_cleanup(struct thread_data *td)
213 {
214         struct librpma_fio_client_data *ccd = td->io_ops_data;
215         struct client_data *cd;
216         size_t flush_req_size;
217         size_t io_u_buf_off;
218         size_t send_offset;
219         void *send_ptr;
220         int ret;
221
222         if (ccd == NULL)
223                 return;
224
225         cd = ccd->client_data;
226         if (cd == NULL) {
227                 librpma_fio_client_cleanup(td);
228                 return;
229         }
230
231         /*
232          * Make sure all SEND completions are collected ergo there are free
233          * slots in the SQ for the last SEND message.
234          *
235          * Note: If any operation will fail we still can send the termination
236          * notice.
237          */
238         (void) librpma_fio_client_io_complete_all_sends(td);
239
240         /* prepare the last flush message and pack it to the send buffer */
241         flush_req_size = gpspm_flush_request__get_packed_size(&Flush_req_last);
242         if (flush_req_size > MAX_MSG_SIZE) {
243                 log_err(
244                         "Packed flush request size is bigger than available send buffer space (%zu > %d\n",
245                         flush_req_size, MAX_MSG_SIZE);
246         } else {
247                 io_u_buf_off = IO_U_NEXT_BUF_OFF_CLIENT(cd);
248                 send_offset = io_u_buf_off + SEND_OFFSET;
249                 send_ptr = cd->io_us_msgs + send_offset;
250                 (void) gpspm_flush_request__pack(&Flush_req_last, send_ptr);
251
252                 /* send the flush message */
253                 if ((ret = rpma_send(ccd->conn, cd->msg_mr, send_offset,
254                                 flush_req_size, RPMA_F_COMPLETION_ALWAYS,
255                                 NULL)))
256                         librpma_td_verror(td, ret, "rpma_send");
257
258                 ++ccd->op_send_posted;
259
260                 /* Wait for the SEND to complete */
261                 (void) librpma_fio_client_io_complete_all_sends(td);
262         }
263
264         /* deregister the messaging buffer memory */
265         if ((ret = rpma_mr_dereg(&cd->msg_mr)))
266                 librpma_td_verror(td, ret, "rpma_mr_dereg");
267
268         free(ccd->client_data);
269
270         librpma_fio_client_cleanup(td);
271 }
272
273 static inline int client_io_flush(struct thread_data *td,
274                 struct io_u *first_io_u, struct io_u *last_io_u,
275                 unsigned long long int len)
276 {
277         struct librpma_fio_client_data *ccd = td->io_ops_data;
278         struct client_data *cd = ccd->client_data;
279         size_t io_u_buf_off = IO_U_NEXT_BUF_OFF_CLIENT(cd);
280         size_t send_offset = io_u_buf_off + SEND_OFFSET;
281         size_t recv_offset = io_u_buf_off + RECV_OFFSET;
282         void *send_ptr = cd->io_us_msgs + send_offset;
283         void *recv_ptr = cd->io_us_msgs + recv_offset;
284         GPSPMFlushRequest flush_req = GPSPM_FLUSH_REQUEST__INIT;
285         size_t flush_req_size = 0;
286         int ret;
287
288         /* prepare a response buffer */
289         if ((ret = rpma_recv(ccd->conn, cd->msg_mr, recv_offset, MAX_MSG_SIZE,
290                         recv_ptr))) {
291                 librpma_td_verror(td, ret, "rpma_recv");
292                 return -1;
293         }
294
295         /* prepare a flush message and pack it to a send buffer */
296         flush_req.offset = first_io_u->offset;
297         flush_req.length = len;
298         flush_req.op_context = last_io_u->index;
299         flush_req_size = gpspm_flush_request__get_packed_size(&flush_req);
300         if (flush_req_size > MAX_MSG_SIZE) {
301                 log_err(
302                         "Packed flush request size is bigger than available send buffer space (%"
303                         PRIu64 " > %d\n", flush_req_size, MAX_MSG_SIZE);
304                 return -1;
305         }
306         (void) gpspm_flush_request__pack(&flush_req, send_ptr);
307
308         /* send the flush message */
309         if ((ret = rpma_send(ccd->conn, cd->msg_mr, send_offset, flush_req_size,
310                         RPMA_F_COMPLETION_ALWAYS, NULL))) {
311                 librpma_td_verror(td, ret, "rpma_send");
312                 return -1;
313         }
314
315         ++ccd->op_send_posted;
316
317         return 0;
318 }
319
320 static int client_get_io_u_index(struct rpma_completion *cmpl,
321                 unsigned int *io_u_index)
322 {
323         GPSPMFlushResponse *flush_resp;
324
325         if (cmpl->op != RPMA_OP_RECV)
326                 return 0;
327
328         /* unpack a response from the received buffer */
329         flush_resp = gpspm_flush_response__unpack(NULL,
330                         cmpl->byte_len, cmpl->op_context);
331         if (flush_resp == NULL) {
332                 log_err("Cannot unpack the flush response buffer\n");
333                 return -1;
334         }
335
336         memcpy(io_u_index, &flush_resp->op_context, sizeof(*io_u_index));
337
338         gpspm_flush_response__free_unpacked(flush_resp, NULL);
339
340         return 1;
341 }
342
343 FIO_STATIC struct ioengine_ops ioengine_client = {
344         .name                   = "librpma_gpspm_client",
345         .version                = FIO_IOOPS_VERSION,
346         .init                   = client_init,
347         .post_init              = client_post_init,
348         .get_file_size          = librpma_fio_client_get_file_size,
349         .open_file              = librpma_fio_file_nop,
350         .queue                  = librpma_fio_client_queue,
351         .commit                 = librpma_fio_client_commit,
352         .getevents              = librpma_fio_client_getevents,
353         .event                  = librpma_fio_client_event,
354         .errdetails             = librpma_fio_client_errdetails,
355         .close_file             = librpma_fio_file_nop,
356         .cleanup                = client_cleanup,
357         .flags                  = FIO_DISKLESSIO,
358         .options                = librpma_fio_options,
359         .option_struct_size     = sizeof(struct librpma_fio_options_values),
360 };
361
362 /* server side implementation */
363
364 #define IO_U_BUFF_OFF_SERVER(i) (i * IO_U_BUF_LEN)
365
366 struct server_data {
367         /* aligned td->orig_buffer */
368         char *orig_buffer_aligned;
369
370         /* resources for messaging buffer from DRAM allocated by fio */
371         struct rpma_mr_local *msg_mr;
372
373         uint32_t msg_sqe_available; /* # of free SQ slots */
374
375         /* in-memory queues */
376         struct rpma_completion *msgs_queued;
377         uint32_t msg_queued_nr;
378 };
379
380 static int server_init(struct thread_data *td)
381 {
382         struct librpma_fio_server_data *csd;
383         struct server_data *sd;
384         int ret = -1;
385
386         if ((ret = librpma_fio_server_init(td)))
387                 return ret;
388
389         csd = td->io_ops_data;
390
391         /* allocate server's data */
392         sd = calloc(1, sizeof(*sd));
393         if (sd == NULL) {
394                 td_verror(td, errno, "calloc");
395                 goto err_server_cleanup;
396         }
397
398         /* allocate in-memory queue */
399         sd->msgs_queued = calloc(td->o.iodepth, sizeof(*sd->msgs_queued));
400         if (sd->msgs_queued == NULL) {
401                 td_verror(td, errno, "calloc");
402                 goto err_free_sd;
403         }
404
405         /*
406          * Assure a single io_u buffer can store both SEND and RECV messages and
407          * an io_us buffer allocation is page-size-aligned which is required
408          * to register for RDMA. User-provided values are intentionally ignored.
409          */
410         td->o.max_bs[DDIR_READ] = IO_U_BUF_LEN;
411         td->o.mem_align = page_size;
412
413         csd->server_data = sd;
414
415         return 0;
416
417 err_free_sd:
418         free(sd);
419
420 err_server_cleanup:
421         librpma_fio_server_cleanup(td);
422
423         return -1;
424 }
425
426 static int server_post_init(struct thread_data *td)
427 {
428         struct librpma_fio_server_data *csd = td->io_ops_data;
429         struct server_data *sd = csd->server_data;
430         size_t io_us_size;
431         size_t io_u_buflen;
432         int ret;
433
434         /*
435          * td->orig_buffer is not aligned. The engine requires aligned io_us
436          * so FIO alignes up the address using the formula below.
437          */
438         sd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
439                         td->o.mem_align;
440
441         /*
442          * XXX
443          * Each io_u message buffer contains recv and send messages.
444          * Aligning each of those buffers may potentially give
445          * some performance benefits.
446          */
447         io_u_buflen = td_max_bs(td);
448
449         /* check whether io_u buffer is big enough */
450         if (io_u_buflen < IO_U_BUF_LEN) {
451                 log_err(
452                         "blocksize too small to accommodate assumed maximal request/response pair size (%" PRIu64 " < %d)\n",
453                         io_u_buflen, IO_U_BUF_LEN);
454                 return -1;
455         }
456
457         /*
458          * td->orig_buffer_size beside the space really consumed by io_us
459          * has paddings which can be omitted for the memory registration.
460          */
461         io_us_size = (unsigned long long)io_u_buflen *
462                         (unsigned long long)td->o.iodepth;
463
464         if ((ret = rpma_mr_reg(csd->peer, sd->orig_buffer_aligned, io_us_size,
465                         RPMA_MR_USAGE_SEND | RPMA_MR_USAGE_RECV,
466                         &sd->msg_mr))) {
467                 librpma_td_verror(td, ret, "rpma_mr_reg");
468                 return -1;
469         }
470
471         return 0;
472 }
473
474 static void server_cleanup(struct thread_data *td)
475 {
476         struct librpma_fio_server_data *csd = td->io_ops_data;
477         struct server_data *sd;
478         int ret;
479
480         if (csd == NULL)
481                 return;
482
483         sd = csd->server_data;
484
485         if (sd != NULL) {
486                 /* rpma_mr_dereg(messaging buffer from DRAM) */
487                 if ((ret = rpma_mr_dereg(&sd->msg_mr)))
488                         librpma_td_verror(td, ret, "rpma_mr_dereg");
489
490                 free(sd->msgs_queued);
491                 free(sd);
492         }
493
494         librpma_fio_server_cleanup(td);
495 }
496
497 static int prepare_connection(struct thread_data *td,
498                 struct rpma_conn_req *conn_req)
499 {
500         struct librpma_fio_server_data *csd = td->io_ops_data;
501         struct server_data *sd = csd->server_data;
502         int ret;
503         int i;
504
505         /* prepare buffers for a flush requests */
506         sd->msg_sqe_available = td->o.iodepth;
507         for (i = 0; i < td->o.iodepth; i++) {
508                 size_t offset_recv_msg = IO_U_BUFF_OFF_SERVER(i) + RECV_OFFSET;
509                 if ((ret = rpma_conn_req_recv(conn_req, sd->msg_mr,
510                                 offset_recv_msg, MAX_MSG_SIZE,
511                                 (const void *)(uintptr_t)i))) {
512                         librpma_td_verror(td, ret, "rpma_conn_req_recv");
513                         return ret;
514                 }
515         }
516
517         return 0;
518 }
519
520 static int server_open_file(struct thread_data *td, struct fio_file *f)
521 {
522         struct librpma_fio_server_data *csd = td->io_ops_data;
523         struct rpma_conn_cfg *cfg = NULL;
524         uint16_t max_msg_num = td->o.iodepth;
525         int ret;
526
527         csd->prepare_connection = prepare_connection;
528
529         /* create a connection configuration object */
530         if ((ret = rpma_conn_cfg_new(&cfg))) {
531                 librpma_td_verror(td, ret, "rpma_conn_cfg_new");
532                 return -1;
533         }
534
535         /*
536          * Calculate the required queue sizes where:
537          * - the send queue (SQ) has to be big enough to accommodate
538          *   all possible flush requests (SENDs)
539          * - the receive queue (RQ) has to be big enough to accommodate
540          *   all flush responses (RECVs)
541          * - the completion queue (CQ) has to be big enough to accommodate
542          *   all success and error completions (sq_size + rq_size)
543          */
544         if ((ret = rpma_conn_cfg_set_sq_size(cfg, max_msg_num))) {
545                 librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
546                 goto err_cfg_delete;
547         }
548         if ((ret = rpma_conn_cfg_set_rq_size(cfg, max_msg_num))) {
549                 librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size");
550                 goto err_cfg_delete;
551         }
552         if ((ret = rpma_conn_cfg_set_cq_size(cfg, max_msg_num * 2))) {
553                 librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
554                 goto err_cfg_delete;
555         }
556
557         ret = librpma_fio_server_open_file(td, f, cfg);
558
559 err_cfg_delete:
560         (void) rpma_conn_cfg_delete(&cfg);
561
562         return ret;
563 }
564
565 static int server_qe_process(struct thread_data *td,
566                 struct rpma_completion *cmpl)
567 {
568         struct librpma_fio_server_data *csd = td->io_ops_data;
569         struct server_data *sd = csd->server_data;
570         GPSPMFlushRequest *flush_req;
571         GPSPMFlushResponse flush_resp = GPSPM_FLUSH_RESPONSE__INIT;
572         size_t flush_resp_size = 0;
573         size_t send_buff_offset;
574         size_t recv_buff_offset;
575         size_t io_u_buff_offset;
576         void *send_buff_ptr;
577         void *recv_buff_ptr;
578         void *op_ptr;
579         int msg_index;
580         int ret;
581
582         /* calculate SEND/RECV pair parameters */
583         msg_index = (int)(uintptr_t)cmpl->op_context;
584         io_u_buff_offset = IO_U_BUFF_OFF_SERVER(msg_index);
585         send_buff_offset = io_u_buff_offset + SEND_OFFSET;
586         recv_buff_offset = io_u_buff_offset + RECV_OFFSET;
587         send_buff_ptr = sd->orig_buffer_aligned + send_buff_offset;
588         recv_buff_ptr = sd->orig_buffer_aligned + recv_buff_offset;
589
590         /* unpack a flush request from the received buffer */
591         flush_req = gpspm_flush_request__unpack(NULL, cmpl->byte_len,
592                         recv_buff_ptr);
593         if (flush_req == NULL) {
594                 log_err("cannot unpack the flush request buffer\n");
595                 goto err_terminate;
596         }
597
598         if (IS_NOT_THE_LAST_MESSAGE(flush_req)) {
599                 op_ptr = csd->ws_ptr + flush_req->offset;
600                 pmem_persist(op_ptr, flush_req->length);
601         } else {
602                 /*
603                  * This is the last message - the client is done.
604                  */
605                 gpspm_flush_request__free_unpacked(flush_req, NULL);
606                 td->done = true;
607                 return 0;
608         }
609
610         /* initiate the next receive operation */
611         if ((ret = rpma_recv(csd->conn, sd->msg_mr, recv_buff_offset,
612                         MAX_MSG_SIZE,
613                         (const void *)(uintptr_t)msg_index))) {
614                 librpma_td_verror(td, ret, "rpma_recv");
615                 goto err_free_unpacked;
616         }
617
618         /* prepare a flush response and pack it to a send buffer */
619         flush_resp.op_context = flush_req->op_context;
620         flush_resp_size = gpspm_flush_response__get_packed_size(&flush_resp);
621         if (flush_resp_size > MAX_MSG_SIZE) {
622                 log_err(
623                         "Size of the packed flush response is bigger than the available space of the send buffer (%"
624                         PRIu64 " > %i\n", flush_resp_size, MAX_MSG_SIZE);
625                 goto err_free_unpacked;
626         }
627
628         (void) gpspm_flush_response__pack(&flush_resp, send_buff_ptr);
629
630         /* send the flush response */
631         if ((ret = rpma_send(csd->conn, sd->msg_mr, send_buff_offset,
632                         flush_resp_size, RPMA_F_COMPLETION_ALWAYS, NULL))) {
633                 librpma_td_verror(td, ret, "rpma_send");
634                 goto err_free_unpacked;
635         }
636         --sd->msg_sqe_available;
637
638         gpspm_flush_request__free_unpacked(flush_req, NULL);
639
640         return 0;
641
642 err_free_unpacked:
643         gpspm_flush_request__free_unpacked(flush_req, NULL);
644
645 err_terminate:
646         td->terminate = true;
647
648         return -1;
649 }
650
651 static inline int server_queue_process(struct thread_data *td)
652 {
653         struct librpma_fio_server_data *csd = td->io_ops_data;
654         struct server_data *sd = csd->server_data;
655         int ret;
656         int i;
657
658         /* min(# of queue entries, # of SQ entries available) */
659         uint32_t qes_to_process = min(sd->msg_queued_nr, sd->msg_sqe_available);
660         if (qes_to_process == 0)
661                 return 0;
662
663         /* process queued completions */
664         for (i = 0; i < qes_to_process; ++i) {
665                 if ((ret = server_qe_process(td, &sd->msgs_queued[i])))
666                         return ret;
667         }
668
669         /* progress the queue */
670         for (i = 0; i < sd->msg_queued_nr - qes_to_process; ++i) {
671                 memcpy(&sd->msgs_queued[i],
672                         &sd->msgs_queued[qes_to_process + i],
673                         sizeof(sd->msgs_queued[i]));
674         }
675
676         sd->msg_queued_nr -= qes_to_process;
677
678         return 0;
679 }
680
681 static int server_cmpl_process(struct thread_data *td)
682 {
683         struct librpma_fio_server_data *csd = td->io_ops_data;
684         struct server_data *sd = csd->server_data;
685         struct rpma_completion *cmpl = &sd->msgs_queued[sd->msg_queued_nr];
686         int ret;
687
688         ret = rpma_conn_completion_get(csd->conn, cmpl);
689         if (ret == RPMA_E_NO_COMPLETION) {
690                 /* lack of completion is not an error */
691                 return 0;
692         } else if (ret != 0) {
693                 librpma_td_verror(td, ret, "rpma_conn_completion_get");
694                 goto err_terminate;
695         }
696
697         /* validate the completion */
698         if (cmpl->op_status != IBV_WC_SUCCESS)
699                 goto err_terminate;
700
701         if (cmpl->op == RPMA_OP_RECV)
702                 ++sd->msg_queued_nr;
703         else if (cmpl->op == RPMA_OP_SEND)
704                 ++sd->msg_sqe_available;
705
706         return 0;
707
708 err_terminate:
709         td->terminate = true;
710
711         return -1;
712 }
713
714 static enum fio_q_status server_queue(struct thread_data *td, struct io_u *io_u)
715 {
716         do {
717                 if (server_cmpl_process(td))
718                         return FIO_Q_BUSY;
719
720                 if (server_queue_process(td))
721                         return FIO_Q_BUSY;
722
723         } while (!td->done);
724
725         return FIO_Q_COMPLETED;
726 }
727
728 FIO_STATIC struct ioengine_ops ioengine_server = {
729         .name                   = "librpma_gpspm_server",
730         .version                = FIO_IOOPS_VERSION,
731         .init                   = server_init,
732         .post_init              = server_post_init,
733         .open_file              = server_open_file,
734         .close_file             = librpma_fio_server_close_file,
735         .queue                  = server_queue,
736         .invalidate             = librpma_fio_file_nop,
737         .cleanup                = server_cleanup,
738         .flags                  = FIO_SYNCIO,
739         .options                = librpma_fio_options,
740         .option_struct_size     = sizeof(struct librpma_fio_options_values),
741 };
742
743 /* register both engines */
744
745 static void fio_init fio_librpma_gpspm_register(void)
746 {
747         register_ioengine(&ioengine_client);
748         register_ioengine(&ioengine_server);
749 }
750
751 static void fio_exit fio_librpma_gpspm_unregister(void)
752 {
753         unregister_ioengine(&ioengine_client);
754         unregister_ioengine(&ioengine_server);
755 }