Merge branch 'global_dedup' of https://github.com/bardavid/fio
[fio.git] / engines / librpma_gpspm.c
1 /*
2  * librpma_gpspm: IO engine that uses PMDK librpma to write data,
3  *              based on General Purpose Server Persistency Method
4  *
5  * Copyright 2020-2021, Intel Corporation
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License,
9  * version 2 as published by the Free Software Foundation..
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  */
16
17 #include "librpma_fio.h"
18
19 #include <libpmem.h>
20
21 /* Generated by the protocol buffer compiler from: librpma_gpspm_flush.proto */
22 #include "librpma_gpspm_flush.pb-c.h"
23
24 #define MAX_MSG_SIZE (512)
25 #define IO_U_BUF_LEN (2 * MAX_MSG_SIZE)
26 #define SEND_OFFSET (0)
27 #define RECV_OFFSET (SEND_OFFSET + MAX_MSG_SIZE)
28
29 #define GPSPM_FLUSH_REQUEST__LAST \
30         { PROTOBUF_C_MESSAGE_INIT(&gpspm_flush_request__descriptor), 0, 0, 0 }
31
32 /*
33  * 'Flush_req_last' is the last flush request
34  * the client has to send to server to indicate
35  * that the client is done.
36  */
37 static const GPSPMFlushRequest Flush_req_last = GPSPM_FLUSH_REQUEST__LAST;
38
39 #define IS_NOT_THE_LAST_MESSAGE(flush_req) \
40         (flush_req->length != Flush_req_last.length || \
41         flush_req->offset != Flush_req_last.offset)
42
43 /* client side implementation */
44
45 /* get next io_u message buffer in the round-robin fashion */
46 #define IO_U_NEXT_BUF_OFF_CLIENT(cd) \
47         (IO_U_BUF_LEN * ((cd->msg_curr++) % cd->msg_num))
48
49 struct client_data {
50         /* memory for sending and receiving buffered */
51         char *io_us_msgs;
52
53         /* resources for messaging buffer */
54         uint32_t msg_num;
55         uint32_t msg_curr;
56         struct rpma_mr_local *msg_mr;
57 };
58
59 static inline int client_io_flush(struct thread_data *td,
60                 struct io_u *first_io_u, struct io_u *last_io_u,
61                 unsigned long long int len);
62
63 static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index);
64
65 static int client_init(struct thread_data *td)
66 {
67         struct librpma_fio_client_data *ccd;
68         struct client_data *cd;
69         uint32_t write_num;
70         struct rpma_conn_cfg *cfg = NULL;
71         int ret;
72
73         /*
74          * not supported:
75          * - readwrite = read / trim / randread / randtrim /
76          *               / rw / randrw / trimwrite
77          */
78         if (td_read(td) || td_trim(td)) {
79                 td_verror(td, EINVAL, "Not supported mode.");
80                 return -1;
81         }
82
83         /* allocate client's data */
84         cd = calloc(1, sizeof(*cd));
85         if (cd == NULL) {
86                 td_verror(td, errno, "calloc");
87                 return -1;
88         }
89
90         /*
91          * Calculate the required number of WRITEs and FLUSHes.
92          *
93          * Note: Each flush is a request (SEND) and response (RECV) pair.
94          */
95         if (td_random(td)) {
96                 write_num = td->o.iodepth; /* WRITE * N */
97                 cd->msg_num = td->o.iodepth; /* FLUSH * N */
98         } else {
99                 if (td->o.sync_io) {
100                         write_num = 1; /* WRITE */
101                         cd->msg_num = 1; /* FLUSH */
102                 } else {
103                         write_num = td->o.iodepth; /* WRITE * N */
104                         /*
105                          * FLUSH * B where:
106                          * - B == ceil(iodepth / iodepth_batch)
107                          *   which is the number of batches for N writes
108                          */
109                         cd->msg_num = LIBRPMA_FIO_CEIL(td->o.iodepth,
110                                         td->o.iodepth_batch);
111                 }
112         }
113
114         /* create a connection configuration object */
115         if ((ret = rpma_conn_cfg_new(&cfg))) {
116                 librpma_td_verror(td, ret, "rpma_conn_cfg_new");
117                 goto err_free_cd;
118         }
119
120         /*
121          * Calculate the required queue sizes where:
122          * - the send queue (SQ) has to be big enough to accommodate
123          *   all io_us (WRITEs) and all flush requests (SENDs)
124          * - the receive queue (RQ) has to be big enough to accommodate
125          *   all flush responses (RECVs)
126          * - the completion queue (CQ) has to be big enough to accommodate all
127          *   success and error completions (sq_size + rq_size)
128          */
129         if ((ret = rpma_conn_cfg_set_sq_size(cfg, write_num + cd->msg_num))) {
130                 librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
131                 goto err_cfg_delete;
132         }
133         if ((ret = rpma_conn_cfg_set_rq_size(cfg, cd->msg_num))) {
134                 librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size");
135                 goto err_cfg_delete;
136         }
137         if ((ret = rpma_conn_cfg_set_cq_size(cfg, write_num + cd->msg_num * 2))) {
138                 librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
139                 goto err_cfg_delete;
140         }
141
142         if (librpma_fio_client_init(td, cfg))
143                 goto err_cfg_delete;
144
145         ccd = td->io_ops_data;
146
147         if (ccd->ws->direct_write_to_pmem &&
148             ccd->server_mr_flush_type == RPMA_FLUSH_TYPE_PERSISTENT &&
149             td->thread_number == 1) {
150                 /* XXX log_info mixes with the JSON output */
151                 log_err(
152                         "Note: The server side supports Direct Write to PMem and it is equipped with PMem (direct_write_to_pmem).\n"
153                         "You can use librpma_client and librpma_server engines for better performance instead of GPSPM.\n");
154         }
155
156         /* validate the server's RQ capacity */
157         if (cd->msg_num > ccd->ws->max_msg_num) {
158                 log_err(
159                         "server's RQ size (iodepth) too small to handle the client's workspace requirements (%u < %u)\n",
160                         ccd->ws->max_msg_num, cd->msg_num);
161                 goto err_cleanup_common;
162         }
163
164         if ((ret = rpma_conn_cfg_delete(&cfg))) {
165                 librpma_td_verror(td, ret, "rpma_conn_cfg_delete");
166                 /* non fatal error - continue */
167         }
168
169         ccd->flush = client_io_flush;
170         ccd->get_io_u_index = client_get_io_u_index;
171         ccd->client_data = cd;
172
173         return 0;
174
175 err_cleanup_common:
176         librpma_fio_client_cleanup(td);
177
178 err_cfg_delete:
179         (void) rpma_conn_cfg_delete(&cfg);
180
181 err_free_cd:
182         free(cd);
183
184         return -1;
185 }
186
187 static int client_post_init(struct thread_data *td)
188 {
189         struct librpma_fio_client_data *ccd = td->io_ops_data;
190         struct client_data *cd = ccd->client_data;
191         unsigned int io_us_msgs_size;
192         int ret;
193
194         /* message buffers initialization and registration */
195         io_us_msgs_size = cd->msg_num * IO_U_BUF_LEN;
196         if ((ret = posix_memalign((void **)&cd->io_us_msgs, page_size,
197                         io_us_msgs_size))) {
198                 td_verror(td, ret, "posix_memalign");
199                 return ret;
200         }
201         if ((ret = rpma_mr_reg(ccd->peer, cd->io_us_msgs, io_us_msgs_size,
202                         RPMA_MR_USAGE_SEND | RPMA_MR_USAGE_RECV,
203                         &cd->msg_mr))) {
204                 librpma_td_verror(td, ret, "rpma_mr_reg");
205                 return ret;
206         }
207
208         return librpma_fio_client_post_init(td);
209 }
210
211 static void client_cleanup(struct thread_data *td)
212 {
213         struct librpma_fio_client_data *ccd = td->io_ops_data;
214         struct client_data *cd;
215         size_t flush_req_size;
216         size_t io_u_buf_off;
217         size_t send_offset;
218         void *send_ptr;
219         int ret;
220
221         if (ccd == NULL)
222                 return;
223
224         cd = ccd->client_data;
225         if (cd == NULL) {
226                 librpma_fio_client_cleanup(td);
227                 return;
228         }
229
230         /*
231          * Make sure all SEND completions are collected ergo there are free
232          * slots in the SQ for the last SEND message.
233          *
234          * Note: If any operation will fail we still can send the termination
235          * notice.
236          */
237         (void) librpma_fio_client_io_complete_all_sends(td);
238
239         /* prepare the last flush message and pack it to the send buffer */
240         flush_req_size = gpspm_flush_request__get_packed_size(&Flush_req_last);
241         if (flush_req_size > MAX_MSG_SIZE) {
242                 log_err(
243                         "Packed flush request size is bigger than available send buffer space (%zu > %d\n",
244                         flush_req_size, MAX_MSG_SIZE);
245         } else {
246                 io_u_buf_off = IO_U_NEXT_BUF_OFF_CLIENT(cd);
247                 send_offset = io_u_buf_off + SEND_OFFSET;
248                 send_ptr = cd->io_us_msgs + send_offset;
249                 (void) gpspm_flush_request__pack(&Flush_req_last, send_ptr);
250
251                 /* send the flush message */
252                 if ((ret = rpma_send(ccd->conn, cd->msg_mr, send_offset,
253                                 flush_req_size, RPMA_F_COMPLETION_ALWAYS,
254                                 NULL)))
255                         librpma_td_verror(td, ret, "rpma_send");
256
257                 ++ccd->op_send_posted;
258
259                 /* Wait for the SEND to complete */
260                 (void) librpma_fio_client_io_complete_all_sends(td);
261         }
262
263         /* deregister the messaging buffer memory */
264         if ((ret = rpma_mr_dereg(&cd->msg_mr)))
265                 librpma_td_verror(td, ret, "rpma_mr_dereg");
266
267         free(ccd->client_data);
268
269         librpma_fio_client_cleanup(td);
270 }
271
272 static inline int client_io_flush(struct thread_data *td,
273                 struct io_u *first_io_u, struct io_u *last_io_u,
274                 unsigned long long int len)
275 {
276         struct librpma_fio_client_data *ccd = td->io_ops_data;
277         struct client_data *cd = ccd->client_data;
278         size_t io_u_buf_off = IO_U_NEXT_BUF_OFF_CLIENT(cd);
279         size_t send_offset = io_u_buf_off + SEND_OFFSET;
280         size_t recv_offset = io_u_buf_off + RECV_OFFSET;
281         void *send_ptr = cd->io_us_msgs + send_offset;
282         void *recv_ptr = cd->io_us_msgs + recv_offset;
283         GPSPMFlushRequest flush_req = GPSPM_FLUSH_REQUEST__INIT;
284         size_t flush_req_size = 0;
285         int ret;
286
287         /* prepare a response buffer */
288         if ((ret = rpma_recv(ccd->conn, cd->msg_mr, recv_offset, MAX_MSG_SIZE,
289                         recv_ptr))) {
290                 librpma_td_verror(td, ret, "rpma_recv");
291                 return -1;
292         }
293
294         /* prepare a flush message and pack it to a send buffer */
295         flush_req.offset = first_io_u->offset;
296         flush_req.length = len;
297         flush_req.op_context = last_io_u->index;
298         flush_req_size = gpspm_flush_request__get_packed_size(&flush_req);
299         if (flush_req_size > MAX_MSG_SIZE) {
300                 log_err(
301                         "Packed flush request size is bigger than available send buffer space (%"
302                         PRIu64 " > %d\n", flush_req_size, MAX_MSG_SIZE);
303                 return -1;
304         }
305         (void) gpspm_flush_request__pack(&flush_req, send_ptr);
306
307         /* send the flush message */
308         if ((ret = rpma_send(ccd->conn, cd->msg_mr, send_offset, flush_req_size,
309                         RPMA_F_COMPLETION_ALWAYS, NULL))) {
310                 librpma_td_verror(td, ret, "rpma_send");
311                 return -1;
312         }
313
314         ++ccd->op_send_posted;
315
316         return 0;
317 }
318
319 static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index)
320 {
321         GPSPMFlushResponse *flush_resp;
322
323         if (wc->opcode != IBV_WC_RECV)
324                 return 0;
325
326         /* unpack a response from the received buffer */
327         flush_resp = gpspm_flush_response__unpack(NULL,
328                         wc->byte_len, (void *)wc->wr_id);
329         if (flush_resp == NULL) {
330                 log_err("Cannot unpack the flush response buffer\n");
331                 return -1;
332         }
333
334         memcpy(io_u_index, &flush_resp->op_context, sizeof(*io_u_index));
335
336         gpspm_flush_response__free_unpacked(flush_resp, NULL);
337
338         return 1;
339 }
340
341 FIO_STATIC struct ioengine_ops ioengine_client = {
342         .name                   = "librpma_gpspm_client",
343         .version                = FIO_IOOPS_VERSION,
344         .init                   = client_init,
345         .post_init              = client_post_init,
346         .get_file_size          = librpma_fio_client_get_file_size,
347         .open_file              = librpma_fio_file_nop,
348         .queue                  = librpma_fio_client_queue,
349         .commit                 = librpma_fio_client_commit,
350         .getevents              = librpma_fio_client_getevents,
351         .event                  = librpma_fio_client_event,
352         .errdetails             = librpma_fio_client_errdetails,
353         .close_file             = librpma_fio_file_nop,
354         .cleanup                = client_cleanup,
355         .flags                  = FIO_DISKLESSIO,
356         .options                = librpma_fio_options,
357         .option_struct_size     = sizeof(struct librpma_fio_options_values),
358 };
359
360 /* server side implementation */
361
362 #define IO_U_BUFF_OFF_SERVER(i) (i * IO_U_BUF_LEN)
363
364 struct server_data {
365         /* aligned td->orig_buffer */
366         char *orig_buffer_aligned;
367
368         /* resources for messaging buffer from DRAM allocated by fio */
369         struct rpma_mr_local *msg_mr;
370
371         uint32_t msg_sqe_available; /* # of free SQ slots */
372
373         /* in-memory queues */
374         struct ibv_wc *msgs_queued;
375         uint32_t msg_queued_nr;
376 };
377
378 static int server_init(struct thread_data *td)
379 {
380         struct librpma_fio_server_data *csd;
381         struct server_data *sd;
382         int ret = -1;
383
384         if ((ret = librpma_fio_server_init(td)))
385                 return ret;
386
387         csd = td->io_ops_data;
388
389         /* allocate server's data */
390         sd = calloc(1, sizeof(*sd));
391         if (sd == NULL) {
392                 td_verror(td, errno, "calloc");
393                 goto err_server_cleanup;
394         }
395
396         /* allocate in-memory queue */
397         sd->msgs_queued = calloc(td->o.iodepth, sizeof(*sd->msgs_queued));
398         if (sd->msgs_queued == NULL) {
399                 td_verror(td, errno, "calloc");
400                 goto err_free_sd;
401         }
402
403         /*
404          * Assure a single io_u buffer can store both SEND and RECV messages and
405          * an io_us buffer allocation is page-size-aligned which is required
406          * to register for RDMA. User-provided values are intentionally ignored.
407          */
408         td->o.max_bs[DDIR_READ] = IO_U_BUF_LEN;
409         td->o.mem_align = page_size;
410
411         csd->server_data = sd;
412
413         return 0;
414
415 err_free_sd:
416         free(sd);
417
418 err_server_cleanup:
419         librpma_fio_server_cleanup(td);
420
421         return -1;
422 }
423
424 static int server_post_init(struct thread_data *td)
425 {
426         struct librpma_fio_server_data *csd = td->io_ops_data;
427         struct server_data *sd = csd->server_data;
428         size_t io_us_size;
429         size_t io_u_buflen;
430         int ret;
431
432         /*
433          * td->orig_buffer is not aligned. The engine requires aligned io_us
434          * so FIO aligns up the address using the formula below.
435          */
436         sd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
437                         td->o.mem_align;
438
439         /*
440          * XXX
441          * Each io_u message buffer contains recv and send messages.
442          * Aligning each of those buffers may potentially give
443          * some performance benefits.
444          */
445         io_u_buflen = td_max_bs(td);
446
447         /* check whether io_u buffer is big enough */
448         if (io_u_buflen < IO_U_BUF_LEN) {
449                 log_err(
450                         "blocksize too small to accommodate assumed maximal request/response pair size (%" PRIu64 " < %d)\n",
451                         io_u_buflen, IO_U_BUF_LEN);
452                 return -1;
453         }
454
455         /*
456          * td->orig_buffer_size beside the space really consumed by io_us
457          * has paddings which can be omitted for the memory registration.
458          */
459         io_us_size = (unsigned long long)io_u_buflen *
460                         (unsigned long long)td->o.iodepth;
461
462         if ((ret = rpma_mr_reg(csd->peer, sd->orig_buffer_aligned, io_us_size,
463                         RPMA_MR_USAGE_SEND | RPMA_MR_USAGE_RECV,
464                         &sd->msg_mr))) {
465                 librpma_td_verror(td, ret, "rpma_mr_reg");
466                 return -1;
467         }
468
469         return 0;
470 }
471
472 static void server_cleanup(struct thread_data *td)
473 {
474         struct librpma_fio_server_data *csd = td->io_ops_data;
475         struct server_data *sd;
476         int ret;
477
478         if (csd == NULL)
479                 return;
480
481         sd = csd->server_data;
482
483         if (sd != NULL) {
484                 /* rpma_mr_dereg(messaging buffer from DRAM) */
485                 if ((ret = rpma_mr_dereg(&sd->msg_mr)))
486                         librpma_td_verror(td, ret, "rpma_mr_dereg");
487
488                 free(sd->msgs_queued);
489                 free(sd);
490         }
491
492         librpma_fio_server_cleanup(td);
493 }
494
495 static int prepare_connection(struct thread_data *td,
496                 struct rpma_conn_req *conn_req)
497 {
498         struct librpma_fio_server_data *csd = td->io_ops_data;
499         struct server_data *sd = csd->server_data;
500         int ret;
501         int i;
502
503         /* prepare buffers for a flush requests */
504         sd->msg_sqe_available = td->o.iodepth;
505         for (i = 0; i < td->o.iodepth; i++) {
506                 size_t offset_recv_msg = IO_U_BUFF_OFF_SERVER(i) + RECV_OFFSET;
507                 if ((ret = rpma_conn_req_recv(conn_req, sd->msg_mr,
508                                 offset_recv_msg, MAX_MSG_SIZE,
509                                 (const void *)(uintptr_t)i))) {
510                         librpma_td_verror(td, ret, "rpma_conn_req_recv");
511                         return ret;
512                 }
513         }
514
515         return 0;
516 }
517
518 static int server_open_file(struct thread_data *td, struct fio_file *f)
519 {
520         struct librpma_fio_server_data *csd = td->io_ops_data;
521         struct rpma_conn_cfg *cfg = NULL;
522         uint16_t max_msg_num = td->o.iodepth;
523         int ret;
524
525         csd->prepare_connection = prepare_connection;
526
527         /* create a connection configuration object */
528         if ((ret = rpma_conn_cfg_new(&cfg))) {
529                 librpma_td_verror(td, ret, "rpma_conn_cfg_new");
530                 return -1;
531         }
532
533         /*
534          * Calculate the required queue sizes where:
535          * - the send queue (SQ) has to be big enough to accommodate
536          *   all possible flush requests (SENDs)
537          * - the receive queue (RQ) has to be big enough to accommodate
538          *   all flush responses (RECVs)
539          * - the completion queue (CQ) has to be big enough to accommodate
540          *   all success and error completions (sq_size + rq_size)
541          */
542         if ((ret = rpma_conn_cfg_set_sq_size(cfg, max_msg_num))) {
543                 librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
544                 goto err_cfg_delete;
545         }
546         if ((ret = rpma_conn_cfg_set_rq_size(cfg, max_msg_num))) {
547                 librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size");
548                 goto err_cfg_delete;
549         }
550         if ((ret = rpma_conn_cfg_set_cq_size(cfg, max_msg_num * 2))) {
551                 librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
552                 goto err_cfg_delete;
553         }
554
555         ret = librpma_fio_server_open_file(td, f, cfg);
556
557 err_cfg_delete:
558         (void) rpma_conn_cfg_delete(&cfg);
559
560         return ret;
561 }
562
563 static int server_qe_process(struct thread_data *td, struct ibv_wc *wc)
564 {
565         struct librpma_fio_server_data *csd = td->io_ops_data;
566         struct server_data *sd = csd->server_data;
567         GPSPMFlushRequest *flush_req;
568         GPSPMFlushResponse flush_resp = GPSPM_FLUSH_RESPONSE__INIT;
569         size_t flush_resp_size = 0;
570         size_t send_buff_offset;
571         size_t recv_buff_offset;
572         size_t io_u_buff_offset;
573         void *send_buff_ptr;
574         void *recv_buff_ptr;
575         void *op_ptr;
576         int msg_index;
577         int ret;
578
579         /* calculate SEND/RECV pair parameters */
580         msg_index = (int)(uintptr_t)wc->wr_id;
581         io_u_buff_offset = IO_U_BUFF_OFF_SERVER(msg_index);
582         send_buff_offset = io_u_buff_offset + SEND_OFFSET;
583         recv_buff_offset = io_u_buff_offset + RECV_OFFSET;
584         send_buff_ptr = sd->orig_buffer_aligned + send_buff_offset;
585         recv_buff_ptr = sd->orig_buffer_aligned + recv_buff_offset;
586
587         /* unpack a flush request from the received buffer */
588         flush_req = gpspm_flush_request__unpack(NULL, wc->byte_len,
589                         recv_buff_ptr);
590         if (flush_req == NULL) {
591                 log_err("cannot unpack the flush request buffer\n");
592                 goto err_terminate;
593         }
594
595         if (IS_NOT_THE_LAST_MESSAGE(flush_req)) {
596                 op_ptr = csd->ws_ptr + flush_req->offset;
597                 pmem_persist(op_ptr, flush_req->length);
598         } else {
599                 /*
600                  * This is the last message - the client is done.
601                  */
602                 gpspm_flush_request__free_unpacked(flush_req, NULL);
603                 td->done = true;
604                 return 0;
605         }
606
607         /* initiate the next receive operation */
608         if ((ret = rpma_recv(csd->conn, sd->msg_mr, recv_buff_offset,
609                         MAX_MSG_SIZE,
610                         (const void *)(uintptr_t)msg_index))) {
611                 librpma_td_verror(td, ret, "rpma_recv");
612                 goto err_free_unpacked;
613         }
614
615         /* prepare a flush response and pack it to a send buffer */
616         flush_resp.op_context = flush_req->op_context;
617         flush_resp_size = gpspm_flush_response__get_packed_size(&flush_resp);
618         if (flush_resp_size > MAX_MSG_SIZE) {
619                 log_err(
620                         "Size of the packed flush response is bigger than the available space of the send buffer (%"
621                         PRIu64 " > %i\n", flush_resp_size, MAX_MSG_SIZE);
622                 goto err_free_unpacked;
623         }
624
625         (void) gpspm_flush_response__pack(&flush_resp, send_buff_ptr);
626
627         /* send the flush response */
628         if ((ret = rpma_send(csd->conn, sd->msg_mr, send_buff_offset,
629                         flush_resp_size, RPMA_F_COMPLETION_ALWAYS, NULL))) {
630                 librpma_td_verror(td, ret, "rpma_send");
631                 goto err_free_unpacked;
632         }
633         --sd->msg_sqe_available;
634
635         gpspm_flush_request__free_unpacked(flush_req, NULL);
636
637         return 0;
638
639 err_free_unpacked:
640         gpspm_flush_request__free_unpacked(flush_req, NULL);
641
642 err_terminate:
643         td->terminate = true;
644
645         return -1;
646 }
647
648 static inline int server_queue_process(struct thread_data *td)
649 {
650         struct librpma_fio_server_data *csd = td->io_ops_data;
651         struct server_data *sd = csd->server_data;
652         int ret;
653         int i;
654
655         /* min(# of queue entries, # of SQ entries available) */
656         uint32_t qes_to_process = min(sd->msg_queued_nr, sd->msg_sqe_available);
657         if (qes_to_process == 0)
658                 return 0;
659
660         /* process queued completions */
661         for (i = 0; i < qes_to_process; ++i) {
662                 if ((ret = server_qe_process(td, &sd->msgs_queued[i])))
663                         return ret;
664         }
665
666         /* progress the queue */
667         for (i = 0; i < sd->msg_queued_nr - qes_to_process; ++i) {
668                 memcpy(&sd->msgs_queued[i],
669                         &sd->msgs_queued[qes_to_process + i],
670                         sizeof(sd->msgs_queued[i]));
671         }
672
673         sd->msg_queued_nr -= qes_to_process;
674
675         return 0;
676 }
677
678 static int server_cmpl_process(struct thread_data *td)
679 {
680         struct librpma_fio_server_data *csd = td->io_ops_data;
681         struct server_data *sd = csd->server_data;
682         struct ibv_wc *wc = &sd->msgs_queued[sd->msg_queued_nr];
683         struct librpma_fio_options_values *o = td->eo;
684         int ret;
685
686         ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL);
687         if (ret == RPMA_E_NO_COMPLETION) {
688                 if (o->busy_wait_polling == 0) {
689                         ret = rpma_cq_wait(csd->cq);
690                         if (ret == RPMA_E_NO_COMPLETION) {
691                                 /* lack of completion is not an error */
692                                 return 0;
693                         } else if (ret != 0) {
694                                 librpma_td_verror(td, ret, "rpma_cq_wait");
695                                 goto err_terminate;
696                         }
697
698                         ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL);
699                         if (ret == RPMA_E_NO_COMPLETION) {
700                                 /* lack of completion is not an error */
701                                 return 0;
702                         } else if (ret != 0) {
703                                 librpma_td_verror(td, ret, "rpma_cq_get_wc");
704                                 goto err_terminate;
705                         }
706                 } else {
707                         /* lack of completion is not an error */
708                         return 0;
709                 }
710         } else if (ret != 0) {
711                 librpma_td_verror(td, ret, "rpma_cq_get_wc");
712                 goto err_terminate;
713         }
714
715         /* validate the completion */
716         if (wc->status != IBV_WC_SUCCESS)
717                 goto err_terminate;
718
719         if (wc->opcode == IBV_WC_RECV)
720                 ++sd->msg_queued_nr;
721         else if (wc->opcode == IBV_WC_SEND)
722                 ++sd->msg_sqe_available;
723
724         return 0;
725
726 err_terminate:
727         td->terminate = true;
728
729         return -1;
730 }
731
732 static enum fio_q_status server_queue(struct thread_data *td, struct io_u *io_u)
733 {
734         do {
735                 if (server_cmpl_process(td))
736                         return FIO_Q_BUSY;
737
738                 if (server_queue_process(td))
739                         return FIO_Q_BUSY;
740
741         } while (!td->done);
742
743         return FIO_Q_COMPLETED;
744 }
745
746 FIO_STATIC struct ioengine_ops ioengine_server = {
747         .name                   = "librpma_gpspm_server",
748         .version                = FIO_IOOPS_VERSION,
749         .init                   = server_init,
750         .post_init              = server_post_init,
751         .open_file              = server_open_file,
752         .close_file             = librpma_fio_server_close_file,
753         .queue                  = server_queue,
754         .invalidate             = librpma_fio_file_nop,
755         .cleanup                = server_cleanup,
756         .flags                  = FIO_SYNCIO,
757         .options                = librpma_fio_options,
758         .option_struct_size     = sizeof(struct librpma_fio_options_values),
759 };
760
761 /* register both engines */
762
763 static void fio_init fio_librpma_gpspm_register(void)
764 {
765         register_ioengine(&ioengine_client);
766         register_ioengine(&ioengine_server);
767 }
768
769 static void fio_exit fio_librpma_gpspm_unregister(void)
770 {
771         unregister_ioengine(&ioengine_client);
772         unregister_ioengine(&ioengine_server);
773 }