Merge branch 'global_dedup' of https://github.com/bardavid/fio
[fio.git] / engines / librpma_gpspm.c
CommitLineData
e4c4625f
JM
1/*
2 * librpma_gpspm: IO engine that uses PMDK librpma to write data,
3 * based on General Purpose Server Persistency Method
4 *
5 * Copyright 2020-2021, Intel Corporation
6 *
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License,
9 * version 2 as published by the Free Software Foundation..
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 */
16
17#include "librpma_fio.h"
18
19#include <libpmem.h>
20
21/* Generated by the protocol buffer compiler from: librpma_gpspm_flush.proto */
22#include "librpma_gpspm_flush.pb-c.h"
23
24#define MAX_MSG_SIZE (512)
25#define IO_U_BUF_LEN (2 * MAX_MSG_SIZE)
26#define SEND_OFFSET (0)
27#define RECV_OFFSET (SEND_OFFSET + MAX_MSG_SIZE)
28
29#define GPSPM_FLUSH_REQUEST__LAST \
30 { PROTOBUF_C_MESSAGE_INIT(&gpspm_flush_request__descriptor), 0, 0, 0 }
31
32/*
33 * 'Flush_req_last' is the last flush request
34 * the client has to send to server to indicate
35 * that the client is done.
36 */
37static const GPSPMFlushRequest Flush_req_last = GPSPM_FLUSH_REQUEST__LAST;
38
39#define IS_NOT_THE_LAST_MESSAGE(flush_req) \
40 (flush_req->length != Flush_req_last.length || \
41 flush_req->offset != Flush_req_last.offset)
42
43/* client side implementation */
44
45/* get next io_u message buffer in the round-robin fashion */
46#define IO_U_NEXT_BUF_OFF_CLIENT(cd) \
47 (IO_U_BUF_LEN * ((cd->msg_curr++) % cd->msg_num))
48
49struct client_data {
50 /* memory for sending and receiving buffered */
51 char *io_us_msgs;
52
53 /* resources for messaging buffer */
54 uint32_t msg_num;
55 uint32_t msg_curr;
56 struct rpma_mr_local *msg_mr;
57};
58
59static inline int client_io_flush(struct thread_data *td,
60 struct io_u *first_io_u, struct io_u *last_io_u,
61 unsigned long long int len);
62
4ef7dd21 63static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index);
e4c4625f
JM
64
65static int client_init(struct thread_data *td)
66{
67 struct librpma_fio_client_data *ccd;
68 struct client_data *cd;
69 uint32_t write_num;
70 struct rpma_conn_cfg *cfg = NULL;
71 int ret;
72
73 /*
74 * not supported:
75 * - readwrite = read / trim / randread / randtrim /
76 * / rw / randrw / trimwrite
77 */
78 if (td_read(td) || td_trim(td)) {
79 td_verror(td, EINVAL, "Not supported mode.");
80 return -1;
81 }
82
83 /* allocate client's data */
84 cd = calloc(1, sizeof(*cd));
85 if (cd == NULL) {
86 td_verror(td, errno, "calloc");
87 return -1;
88 }
89
90 /*
91 * Calculate the required number of WRITEs and FLUSHes.
92 *
93 * Note: Each flush is a request (SEND) and response (RECV) pair.
94 */
95 if (td_random(td)) {
96 write_num = td->o.iodepth; /* WRITE * N */
97 cd->msg_num = td->o.iodepth; /* FLUSH * N */
98 } else {
99 if (td->o.sync_io) {
100 write_num = 1; /* WRITE */
101 cd->msg_num = 1; /* FLUSH */
102 } else {
103 write_num = td->o.iodepth; /* WRITE * N */
104 /*
105 * FLUSH * B where:
106 * - B == ceil(iodepth / iodepth_batch)
107 * which is the number of batches for N writes
108 */
109 cd->msg_num = LIBRPMA_FIO_CEIL(td->o.iodepth,
110 td->o.iodepth_batch);
111 }
112 }
113
114 /* create a connection configuration object */
115 if ((ret = rpma_conn_cfg_new(&cfg))) {
116 librpma_td_verror(td, ret, "rpma_conn_cfg_new");
117 goto err_free_cd;
118 }
119
120 /*
121 * Calculate the required queue sizes where:
122 * - the send queue (SQ) has to be big enough to accommodate
123 * all io_us (WRITEs) and all flush requests (SENDs)
124 * - the receive queue (RQ) has to be big enough to accommodate
125 * all flush responses (RECVs)
126 * - the completion queue (CQ) has to be big enough to accommodate all
127 * success and error completions (sq_size + rq_size)
128 */
129 if ((ret = rpma_conn_cfg_set_sq_size(cfg, write_num + cd->msg_num))) {
130 librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
131 goto err_cfg_delete;
132 }
133 if ((ret = rpma_conn_cfg_set_rq_size(cfg, cd->msg_num))) {
134 librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size");
135 goto err_cfg_delete;
136 }
137 if ((ret = rpma_conn_cfg_set_cq_size(cfg, write_num + cd->msg_num * 2))) {
138 librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
139 goto err_cfg_delete;
140 }
141
142 if (librpma_fio_client_init(td, cfg))
143 goto err_cfg_delete;
144
145 ccd = td->io_ops_data;
146
147 if (ccd->ws->direct_write_to_pmem &&
148 ccd->server_mr_flush_type == RPMA_FLUSH_TYPE_PERSISTENT &&
149 td->thread_number == 1) {
150 /* XXX log_info mixes with the JSON output */
151 log_err(
152 "Note: The server side supports Direct Write to PMem and it is equipped with PMem (direct_write_to_pmem).\n"
153 "You can use librpma_client and librpma_server engines for better performance instead of GPSPM.\n");
154 }
155
156 /* validate the server's RQ capacity */
157 if (cd->msg_num > ccd->ws->max_msg_num) {
158 log_err(
159 "server's RQ size (iodepth) too small to handle the client's workspace requirements (%u < %u)\n",
160 ccd->ws->max_msg_num, cd->msg_num);
161 goto err_cleanup_common;
162 }
163
164 if ((ret = rpma_conn_cfg_delete(&cfg))) {
165 librpma_td_verror(td, ret, "rpma_conn_cfg_delete");
166 /* non fatal error - continue */
167 }
168
169 ccd->flush = client_io_flush;
170 ccd->get_io_u_index = client_get_io_u_index;
171 ccd->client_data = cd;
172
173 return 0;
174
175err_cleanup_common:
176 librpma_fio_client_cleanup(td);
177
178err_cfg_delete:
179 (void) rpma_conn_cfg_delete(&cfg);
180
181err_free_cd:
182 free(cd);
183
184 return -1;
185}
186
187static int client_post_init(struct thread_data *td)
188{
189 struct librpma_fio_client_data *ccd = td->io_ops_data;
190 struct client_data *cd = ccd->client_data;
191 unsigned int io_us_msgs_size;
192 int ret;
193
194 /* message buffers initialization and registration */
195 io_us_msgs_size = cd->msg_num * IO_U_BUF_LEN;
196 if ((ret = posix_memalign((void **)&cd->io_us_msgs, page_size,
197 io_us_msgs_size))) {
198 td_verror(td, ret, "posix_memalign");
199 return ret;
200 }
201 if ((ret = rpma_mr_reg(ccd->peer, cd->io_us_msgs, io_us_msgs_size,
202 RPMA_MR_USAGE_SEND | RPMA_MR_USAGE_RECV,
203 &cd->msg_mr))) {
204 librpma_td_verror(td, ret, "rpma_mr_reg");
205 return ret;
206 }
207
208 return librpma_fio_client_post_init(td);
209}
210
211static void client_cleanup(struct thread_data *td)
212{
213 struct librpma_fio_client_data *ccd = td->io_ops_data;
214 struct client_data *cd;
215 size_t flush_req_size;
216 size_t io_u_buf_off;
217 size_t send_offset;
218 void *send_ptr;
219 int ret;
220
221 if (ccd == NULL)
222 return;
223
224 cd = ccd->client_data;
225 if (cd == NULL) {
226 librpma_fio_client_cleanup(td);
227 return;
228 }
229
230 /*
231 * Make sure all SEND completions are collected ergo there are free
232 * slots in the SQ for the last SEND message.
233 *
234 * Note: If any operation will fail we still can send the termination
235 * notice.
236 */
237 (void) librpma_fio_client_io_complete_all_sends(td);
238
239 /* prepare the last flush message and pack it to the send buffer */
240 flush_req_size = gpspm_flush_request__get_packed_size(&Flush_req_last);
241 if (flush_req_size > MAX_MSG_SIZE) {
242 log_err(
243 "Packed flush request size is bigger than available send buffer space (%zu > %d\n",
244 flush_req_size, MAX_MSG_SIZE);
245 } else {
246 io_u_buf_off = IO_U_NEXT_BUF_OFF_CLIENT(cd);
247 send_offset = io_u_buf_off + SEND_OFFSET;
248 send_ptr = cd->io_us_msgs + send_offset;
249 (void) gpspm_flush_request__pack(&Flush_req_last, send_ptr);
250
251 /* send the flush message */
252 if ((ret = rpma_send(ccd->conn, cd->msg_mr, send_offset,
253 flush_req_size, RPMA_F_COMPLETION_ALWAYS,
254 NULL)))
255 librpma_td_verror(td, ret, "rpma_send");
256
257 ++ccd->op_send_posted;
258
259 /* Wait for the SEND to complete */
260 (void) librpma_fio_client_io_complete_all_sends(td);
261 }
262
263 /* deregister the messaging buffer memory */
264 if ((ret = rpma_mr_dereg(&cd->msg_mr)))
265 librpma_td_verror(td, ret, "rpma_mr_dereg");
266
267 free(ccd->client_data);
268
269 librpma_fio_client_cleanup(td);
270}
271
272static inline int client_io_flush(struct thread_data *td,
273 struct io_u *first_io_u, struct io_u *last_io_u,
274 unsigned long long int len)
275{
276 struct librpma_fio_client_data *ccd = td->io_ops_data;
277 struct client_data *cd = ccd->client_data;
278 size_t io_u_buf_off = IO_U_NEXT_BUF_OFF_CLIENT(cd);
279 size_t send_offset = io_u_buf_off + SEND_OFFSET;
280 size_t recv_offset = io_u_buf_off + RECV_OFFSET;
281 void *send_ptr = cd->io_us_msgs + send_offset;
282 void *recv_ptr = cd->io_us_msgs + recv_offset;
283 GPSPMFlushRequest flush_req = GPSPM_FLUSH_REQUEST__INIT;
284 size_t flush_req_size = 0;
285 int ret;
286
287 /* prepare a response buffer */
288 if ((ret = rpma_recv(ccd->conn, cd->msg_mr, recv_offset, MAX_MSG_SIZE,
289 recv_ptr))) {
290 librpma_td_verror(td, ret, "rpma_recv");
291 return -1;
292 }
293
294 /* prepare a flush message and pack it to a send buffer */
295 flush_req.offset = first_io_u->offset;
296 flush_req.length = len;
297 flush_req.op_context = last_io_u->index;
298 flush_req_size = gpspm_flush_request__get_packed_size(&flush_req);
299 if (flush_req_size > MAX_MSG_SIZE) {
300 log_err(
301 "Packed flush request size is bigger than available send buffer space (%"
302 PRIu64 " > %d\n", flush_req_size, MAX_MSG_SIZE);
303 return -1;
304 }
305 (void) gpspm_flush_request__pack(&flush_req, send_ptr);
306
307 /* send the flush message */
308 if ((ret = rpma_send(ccd->conn, cd->msg_mr, send_offset, flush_req_size,
309 RPMA_F_COMPLETION_ALWAYS, NULL))) {
310 librpma_td_verror(td, ret, "rpma_send");
311 return -1;
312 }
313
314 ++ccd->op_send_posted;
315
316 return 0;
317}
318
4ef7dd21 319static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index)
e4c4625f
JM
320{
321 GPSPMFlushResponse *flush_resp;
322
4ef7dd21 323 if (wc->opcode != IBV_WC_RECV)
e4c4625f
JM
324 return 0;
325
326 /* unpack a response from the received buffer */
327 flush_resp = gpspm_flush_response__unpack(NULL,
4ef7dd21 328 wc->byte_len, (void *)wc->wr_id);
e4c4625f
JM
329 if (flush_resp == NULL) {
330 log_err("Cannot unpack the flush response buffer\n");
331 return -1;
332 }
333
334 memcpy(io_u_index, &flush_resp->op_context, sizeof(*io_u_index));
335
336 gpspm_flush_response__free_unpacked(flush_resp, NULL);
337
338 return 1;
339}
340
341FIO_STATIC struct ioengine_ops ioengine_client = {
342 .name = "librpma_gpspm_client",
343 .version = FIO_IOOPS_VERSION,
344 .init = client_init,
345 .post_init = client_post_init,
346 .get_file_size = librpma_fio_client_get_file_size,
347 .open_file = librpma_fio_file_nop,
348 .queue = librpma_fio_client_queue,
349 .commit = librpma_fio_client_commit,
350 .getevents = librpma_fio_client_getevents,
351 .event = librpma_fio_client_event,
352 .errdetails = librpma_fio_client_errdetails,
353 .close_file = librpma_fio_file_nop,
354 .cleanup = client_cleanup,
355 .flags = FIO_DISKLESSIO,
356 .options = librpma_fio_options,
357 .option_struct_size = sizeof(struct librpma_fio_options_values),
358};
359
360/* server side implementation */
361
362#define IO_U_BUFF_OFF_SERVER(i) (i * IO_U_BUF_LEN)
363
364struct server_data {
365 /* aligned td->orig_buffer */
366 char *orig_buffer_aligned;
367
368 /* resources for messaging buffer from DRAM allocated by fio */
369 struct rpma_mr_local *msg_mr;
370
371 uint32_t msg_sqe_available; /* # of free SQ slots */
372
373 /* in-memory queues */
4ef7dd21 374 struct ibv_wc *msgs_queued;
e4c4625f
JM
375 uint32_t msg_queued_nr;
376};
377
378static int server_init(struct thread_data *td)
379{
380 struct librpma_fio_server_data *csd;
381 struct server_data *sd;
382 int ret = -1;
383
384 if ((ret = librpma_fio_server_init(td)))
385 return ret;
386
387 csd = td->io_ops_data;
388
389 /* allocate server's data */
390 sd = calloc(1, sizeof(*sd));
391 if (sd == NULL) {
392 td_verror(td, errno, "calloc");
393 goto err_server_cleanup;
394 }
395
396 /* allocate in-memory queue */
397 sd->msgs_queued = calloc(td->o.iodepth, sizeof(*sd->msgs_queued));
398 if (sd->msgs_queued == NULL) {
399 td_verror(td, errno, "calloc");
400 goto err_free_sd;
401 }
402
403 /*
404 * Assure a single io_u buffer can store both SEND and RECV messages and
405 * an io_us buffer allocation is page-size-aligned which is required
406 * to register for RDMA. User-provided values are intentionally ignored.
407 */
408 td->o.max_bs[DDIR_READ] = IO_U_BUF_LEN;
409 td->o.mem_align = page_size;
410
411 csd->server_data = sd;
412
413 return 0;
414
415err_free_sd:
416 free(sd);
417
418err_server_cleanup:
419 librpma_fio_server_cleanup(td);
420
421 return -1;
422}
423
424static int server_post_init(struct thread_data *td)
425{
426 struct librpma_fio_server_data *csd = td->io_ops_data;
427 struct server_data *sd = csd->server_data;
428 size_t io_us_size;
429 size_t io_u_buflen;
430 int ret;
431
432 /*
433 * td->orig_buffer is not aligned. The engine requires aligned io_us
fc002f14 434 * so FIO aligns up the address using the formula below.
e4c4625f
JM
435 */
436 sd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
437 td->o.mem_align;
438
439 /*
440 * XXX
441 * Each io_u message buffer contains recv and send messages.
442 * Aligning each of those buffers may potentially give
443 * some performance benefits.
444 */
445 io_u_buflen = td_max_bs(td);
446
447 /* check whether io_u buffer is big enough */
448 if (io_u_buflen < IO_U_BUF_LEN) {
449 log_err(
450 "blocksize too small to accommodate assumed maximal request/response pair size (%" PRIu64 " < %d)\n",
451 io_u_buflen, IO_U_BUF_LEN);
452 return -1;
453 }
454
455 /*
456 * td->orig_buffer_size beside the space really consumed by io_us
457 * has paddings which can be omitted for the memory registration.
458 */
459 io_us_size = (unsigned long long)io_u_buflen *
460 (unsigned long long)td->o.iodepth;
461
462 if ((ret = rpma_mr_reg(csd->peer, sd->orig_buffer_aligned, io_us_size,
463 RPMA_MR_USAGE_SEND | RPMA_MR_USAGE_RECV,
464 &sd->msg_mr))) {
465 librpma_td_verror(td, ret, "rpma_mr_reg");
466 return -1;
467 }
468
469 return 0;
470}
471
472static void server_cleanup(struct thread_data *td)
473{
474 struct librpma_fio_server_data *csd = td->io_ops_data;
475 struct server_data *sd;
476 int ret;
477
478 if (csd == NULL)
479 return;
480
481 sd = csd->server_data;
482
483 if (sd != NULL) {
484 /* rpma_mr_dereg(messaging buffer from DRAM) */
485 if ((ret = rpma_mr_dereg(&sd->msg_mr)))
486 librpma_td_verror(td, ret, "rpma_mr_dereg");
487
488 free(sd->msgs_queued);
489 free(sd);
490 }
491
492 librpma_fio_server_cleanup(td);
493}
494
495static int prepare_connection(struct thread_data *td,
496 struct rpma_conn_req *conn_req)
497{
498 struct librpma_fio_server_data *csd = td->io_ops_data;
499 struct server_data *sd = csd->server_data;
500 int ret;
501 int i;
502
503 /* prepare buffers for a flush requests */
504 sd->msg_sqe_available = td->o.iodepth;
505 for (i = 0; i < td->o.iodepth; i++) {
506 size_t offset_recv_msg = IO_U_BUFF_OFF_SERVER(i) + RECV_OFFSET;
507 if ((ret = rpma_conn_req_recv(conn_req, sd->msg_mr,
508 offset_recv_msg, MAX_MSG_SIZE,
509 (const void *)(uintptr_t)i))) {
510 librpma_td_verror(td, ret, "rpma_conn_req_recv");
511 return ret;
512 }
513 }
514
515 return 0;
516}
517
518static int server_open_file(struct thread_data *td, struct fio_file *f)
519{
520 struct librpma_fio_server_data *csd = td->io_ops_data;
521 struct rpma_conn_cfg *cfg = NULL;
522 uint16_t max_msg_num = td->o.iodepth;
523 int ret;
524
525 csd->prepare_connection = prepare_connection;
526
527 /* create a connection configuration object */
528 if ((ret = rpma_conn_cfg_new(&cfg))) {
529 librpma_td_verror(td, ret, "rpma_conn_cfg_new");
530 return -1;
531 }
532
533 /*
534 * Calculate the required queue sizes where:
535 * - the send queue (SQ) has to be big enough to accommodate
536 * all possible flush requests (SENDs)
537 * - the receive queue (RQ) has to be big enough to accommodate
538 * all flush responses (RECVs)
539 * - the completion queue (CQ) has to be big enough to accommodate
540 * all success and error completions (sq_size + rq_size)
541 */
542 if ((ret = rpma_conn_cfg_set_sq_size(cfg, max_msg_num))) {
543 librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
544 goto err_cfg_delete;
545 }
546 if ((ret = rpma_conn_cfg_set_rq_size(cfg, max_msg_num))) {
547 librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size");
548 goto err_cfg_delete;
549 }
550 if ((ret = rpma_conn_cfg_set_cq_size(cfg, max_msg_num * 2))) {
551 librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
552 goto err_cfg_delete;
553 }
554
555 ret = librpma_fio_server_open_file(td, f, cfg);
556
557err_cfg_delete:
558 (void) rpma_conn_cfg_delete(&cfg);
559
560 return ret;
561}
562
4ef7dd21 563static int server_qe_process(struct thread_data *td, struct ibv_wc *wc)
e4c4625f
JM
564{
565 struct librpma_fio_server_data *csd = td->io_ops_data;
566 struct server_data *sd = csd->server_data;
567 GPSPMFlushRequest *flush_req;
568 GPSPMFlushResponse flush_resp = GPSPM_FLUSH_RESPONSE__INIT;
569 size_t flush_resp_size = 0;
570 size_t send_buff_offset;
571 size_t recv_buff_offset;
572 size_t io_u_buff_offset;
573 void *send_buff_ptr;
574 void *recv_buff_ptr;
575 void *op_ptr;
576 int msg_index;
577 int ret;
578
579 /* calculate SEND/RECV pair parameters */
4ef7dd21 580 msg_index = (int)(uintptr_t)wc->wr_id;
e4c4625f
JM
581 io_u_buff_offset = IO_U_BUFF_OFF_SERVER(msg_index);
582 send_buff_offset = io_u_buff_offset + SEND_OFFSET;
583 recv_buff_offset = io_u_buff_offset + RECV_OFFSET;
584 send_buff_ptr = sd->orig_buffer_aligned + send_buff_offset;
585 recv_buff_ptr = sd->orig_buffer_aligned + recv_buff_offset;
586
587 /* unpack a flush request from the received buffer */
4ef7dd21 588 flush_req = gpspm_flush_request__unpack(NULL, wc->byte_len,
e4c4625f
JM
589 recv_buff_ptr);
590 if (flush_req == NULL) {
591 log_err("cannot unpack the flush request buffer\n");
592 goto err_terminate;
593 }
594
595 if (IS_NOT_THE_LAST_MESSAGE(flush_req)) {
596 op_ptr = csd->ws_ptr + flush_req->offset;
597 pmem_persist(op_ptr, flush_req->length);
598 } else {
599 /*
600 * This is the last message - the client is done.
601 */
602 gpspm_flush_request__free_unpacked(flush_req, NULL);
603 td->done = true;
604 return 0;
605 }
606
607 /* initiate the next receive operation */
608 if ((ret = rpma_recv(csd->conn, sd->msg_mr, recv_buff_offset,
609 MAX_MSG_SIZE,
610 (const void *)(uintptr_t)msg_index))) {
611 librpma_td_verror(td, ret, "rpma_recv");
612 goto err_free_unpacked;
613 }
614
615 /* prepare a flush response and pack it to a send buffer */
616 flush_resp.op_context = flush_req->op_context;
617 flush_resp_size = gpspm_flush_response__get_packed_size(&flush_resp);
618 if (flush_resp_size > MAX_MSG_SIZE) {
619 log_err(
620 "Size of the packed flush response is bigger than the available space of the send buffer (%"
621 PRIu64 " > %i\n", flush_resp_size, MAX_MSG_SIZE);
622 goto err_free_unpacked;
623 }
624
625 (void) gpspm_flush_response__pack(&flush_resp, send_buff_ptr);
626
627 /* send the flush response */
628 if ((ret = rpma_send(csd->conn, sd->msg_mr, send_buff_offset,
629 flush_resp_size, RPMA_F_COMPLETION_ALWAYS, NULL))) {
630 librpma_td_verror(td, ret, "rpma_send");
631 goto err_free_unpacked;
632 }
633 --sd->msg_sqe_available;
634
635 gpspm_flush_request__free_unpacked(flush_req, NULL);
636
637 return 0;
638
639err_free_unpacked:
640 gpspm_flush_request__free_unpacked(flush_req, NULL);
641
642err_terminate:
643 td->terminate = true;
644
645 return -1;
646}
647
648static inline int server_queue_process(struct thread_data *td)
649{
650 struct librpma_fio_server_data *csd = td->io_ops_data;
651 struct server_data *sd = csd->server_data;
652 int ret;
653 int i;
654
655 /* min(# of queue entries, # of SQ entries available) */
656 uint32_t qes_to_process = min(sd->msg_queued_nr, sd->msg_sqe_available);
657 if (qes_to_process == 0)
658 return 0;
659
660 /* process queued completions */
661 for (i = 0; i < qes_to_process; ++i) {
662 if ((ret = server_qe_process(td, &sd->msgs_queued[i])))
663 return ret;
664 }
665
666 /* progress the queue */
667 for (i = 0; i < sd->msg_queued_nr - qes_to_process; ++i) {
668 memcpy(&sd->msgs_queued[i],
669 &sd->msgs_queued[qes_to_process + i],
670 sizeof(sd->msgs_queued[i]));
671 }
672
673 sd->msg_queued_nr -= qes_to_process;
674
675 return 0;
676}
677
678static int server_cmpl_process(struct thread_data *td)
679{
680 struct librpma_fio_server_data *csd = td->io_ops_data;
681 struct server_data *sd = csd->server_data;
4ef7dd21 682 struct ibv_wc *wc = &sd->msgs_queued[sd->msg_queued_nr];
6a229978 683 struct librpma_fio_options_values *o = td->eo;
e4c4625f
JM
684 int ret;
685
4ef7dd21 686 ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL);
e4c4625f 687 if (ret == RPMA_E_NO_COMPLETION) {
6a229978 688 if (o->busy_wait_polling == 0) {
4ef7dd21 689 ret = rpma_cq_wait(csd->cq);
6a229978
OS
690 if (ret == RPMA_E_NO_COMPLETION) {
691 /* lack of completion is not an error */
692 return 0;
693 } else if (ret != 0) {
4ef7dd21 694 librpma_td_verror(td, ret, "rpma_cq_wait");
6a229978
OS
695 goto err_terminate;
696 }
697
4ef7dd21 698 ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL);
6a229978
OS
699 if (ret == RPMA_E_NO_COMPLETION) {
700 /* lack of completion is not an error */
701 return 0;
702 } else if (ret != 0) {
4ef7dd21 703 librpma_td_verror(td, ret, "rpma_cq_get_wc");
6a229978
OS
704 goto err_terminate;
705 }
706 } else {
707 /* lack of completion is not an error */
708 return 0;
709 }
e4c4625f 710 } else if (ret != 0) {
4ef7dd21 711 librpma_td_verror(td, ret, "rpma_cq_get_wc");
e4c4625f
JM
712 goto err_terminate;
713 }
714
715 /* validate the completion */
4ef7dd21 716 if (wc->status != IBV_WC_SUCCESS)
e4c4625f
JM
717 goto err_terminate;
718
4ef7dd21 719 if (wc->opcode == IBV_WC_RECV)
e4c4625f 720 ++sd->msg_queued_nr;
4ef7dd21 721 else if (wc->opcode == IBV_WC_SEND)
e4c4625f
JM
722 ++sd->msg_sqe_available;
723
724 return 0;
725
726err_terminate:
727 td->terminate = true;
728
729 return -1;
730}
731
732static enum fio_q_status server_queue(struct thread_data *td, struct io_u *io_u)
733{
734 do {
735 if (server_cmpl_process(td))
736 return FIO_Q_BUSY;
737
738 if (server_queue_process(td))
739 return FIO_Q_BUSY;
740
741 } while (!td->done);
742
743 return FIO_Q_COMPLETED;
744}
745
746FIO_STATIC struct ioengine_ops ioengine_server = {
747 .name = "librpma_gpspm_server",
748 .version = FIO_IOOPS_VERSION,
749 .init = server_init,
750 .post_init = server_post_init,
751 .open_file = server_open_file,
752 .close_file = librpma_fio_server_close_file,
753 .queue = server_queue,
754 .invalidate = librpma_fio_file_nop,
755 .cleanup = server_cleanup,
756 .flags = FIO_SYNCIO,
757 .options = librpma_fio_options,
758 .option_struct_size = sizeof(struct librpma_fio_options_values),
759};
760
761/* register both engines */
762
763static void fio_init fio_librpma_gpspm_register(void)
764{
765 register_ioengine(&ioengine_client);
766 register_ioengine(&ioengine_server);
767}
768
769static void fio_exit fio_librpma_gpspm_unregister(void)
770{
771 unregister_ioengine(&ioengine_client);
772 unregister_ioengine(&ioengine_server);
773}