docs: update for new data placement options
[fio.git] / engines / librpma_gpspm.c
CommitLineData
e4c4625f
JM
1/*
2 * librpma_gpspm: IO engine that uses PMDK librpma to write data,
3 * based on General Purpose Server Persistency Method
4 *
fb04caf1 5 * Copyright 2020-2022, Intel Corporation
e4c4625f
JM
6 *
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License,
9 * version 2 as published by the Free Software Foundation..
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 */
16
17#include "librpma_fio.h"
18
fb04caf1
KS
19#ifdef CONFIG_LIBPMEM2_INSTALLED
20#include <libpmem2.h>
21#else
e4c4625f 22#include <libpmem.h>
fb04caf1 23#endif
e4c4625f
JM
24
25/* Generated by the protocol buffer compiler from: librpma_gpspm_flush.proto */
26#include "librpma_gpspm_flush.pb-c.h"
27
28#define MAX_MSG_SIZE (512)
29#define IO_U_BUF_LEN (2 * MAX_MSG_SIZE)
30#define SEND_OFFSET (0)
31#define RECV_OFFSET (SEND_OFFSET + MAX_MSG_SIZE)
32
33#define GPSPM_FLUSH_REQUEST__LAST \
34 { PROTOBUF_C_MESSAGE_INIT(&gpspm_flush_request__descriptor), 0, 0, 0 }
35
36/*
37 * 'Flush_req_last' is the last flush request
38 * the client has to send to server to indicate
39 * that the client is done.
40 */
41static const GPSPMFlushRequest Flush_req_last = GPSPM_FLUSH_REQUEST__LAST;
42
43#define IS_NOT_THE_LAST_MESSAGE(flush_req) \
44 (flush_req->length != Flush_req_last.length || \
45 flush_req->offset != Flush_req_last.offset)
46
47/* client side implementation */
48
49/* get next io_u message buffer in the round-robin fashion */
50#define IO_U_NEXT_BUF_OFF_CLIENT(cd) \
51 (IO_U_BUF_LEN * ((cd->msg_curr++) % cd->msg_num))
52
53struct client_data {
54 /* memory for sending and receiving buffered */
55 char *io_us_msgs;
56
57 /* resources for messaging buffer */
58 uint32_t msg_num;
59 uint32_t msg_curr;
60 struct rpma_mr_local *msg_mr;
61};
62
63static inline int client_io_flush(struct thread_data *td,
64 struct io_u *first_io_u, struct io_u *last_io_u,
65 unsigned long long int len);
66
4ef7dd21 67static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index);
e4c4625f
JM
68
69static int client_init(struct thread_data *td)
70{
71 struct librpma_fio_client_data *ccd;
72 struct client_data *cd;
73 uint32_t write_num;
74 struct rpma_conn_cfg *cfg = NULL;
75 int ret;
76
77 /*
78 * not supported:
79 * - readwrite = read / trim / randread / randtrim /
80 * / rw / randrw / trimwrite
81 */
82 if (td_read(td) || td_trim(td)) {
83 td_verror(td, EINVAL, "Not supported mode.");
84 return -1;
85 }
86
87 /* allocate client's data */
88 cd = calloc(1, sizeof(*cd));
89 if (cd == NULL) {
90 td_verror(td, errno, "calloc");
91 return -1;
92 }
93
94 /*
95 * Calculate the required number of WRITEs and FLUSHes.
96 *
97 * Note: Each flush is a request (SEND) and response (RECV) pair.
98 */
99 if (td_random(td)) {
100 write_num = td->o.iodepth; /* WRITE * N */
101 cd->msg_num = td->o.iodepth; /* FLUSH * N */
102 } else {
103 if (td->o.sync_io) {
104 write_num = 1; /* WRITE */
105 cd->msg_num = 1; /* FLUSH */
106 } else {
107 write_num = td->o.iodepth; /* WRITE * N */
108 /*
109 * FLUSH * B where:
110 * - B == ceil(iodepth / iodepth_batch)
111 * which is the number of batches for N writes
112 */
113 cd->msg_num = LIBRPMA_FIO_CEIL(td->o.iodepth,
114 td->o.iodepth_batch);
115 }
116 }
117
118 /* create a connection configuration object */
119 if ((ret = rpma_conn_cfg_new(&cfg))) {
120 librpma_td_verror(td, ret, "rpma_conn_cfg_new");
121 goto err_free_cd;
122 }
123
124 /*
125 * Calculate the required queue sizes where:
126 * - the send queue (SQ) has to be big enough to accommodate
127 * all io_us (WRITEs) and all flush requests (SENDs)
128 * - the receive queue (RQ) has to be big enough to accommodate
129 * all flush responses (RECVs)
130 * - the completion queue (CQ) has to be big enough to accommodate all
131 * success and error completions (sq_size + rq_size)
132 */
133 if ((ret = rpma_conn_cfg_set_sq_size(cfg, write_num + cd->msg_num))) {
134 librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
135 goto err_cfg_delete;
136 }
137 if ((ret = rpma_conn_cfg_set_rq_size(cfg, cd->msg_num))) {
138 librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size");
139 goto err_cfg_delete;
140 }
141 if ((ret = rpma_conn_cfg_set_cq_size(cfg, write_num + cd->msg_num * 2))) {
142 librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
143 goto err_cfg_delete;
144 }
145
146 if (librpma_fio_client_init(td, cfg))
147 goto err_cfg_delete;
148
149 ccd = td->io_ops_data;
150
151 if (ccd->ws->direct_write_to_pmem &&
152 ccd->server_mr_flush_type == RPMA_FLUSH_TYPE_PERSISTENT &&
153 td->thread_number == 1) {
154 /* XXX log_info mixes with the JSON output */
155 log_err(
156 "Note: The server side supports Direct Write to PMem and it is equipped with PMem (direct_write_to_pmem).\n"
157 "You can use librpma_client and librpma_server engines for better performance instead of GPSPM.\n");
158 }
159
160 /* validate the server's RQ capacity */
161 if (cd->msg_num > ccd->ws->max_msg_num) {
162 log_err(
163 "server's RQ size (iodepth) too small to handle the client's workspace requirements (%u < %u)\n",
164 ccd->ws->max_msg_num, cd->msg_num);
165 goto err_cleanup_common;
166 }
167
168 if ((ret = rpma_conn_cfg_delete(&cfg))) {
169 librpma_td_verror(td, ret, "rpma_conn_cfg_delete");
170 /* non fatal error - continue */
171 }
172
173 ccd->flush = client_io_flush;
174 ccd->get_io_u_index = client_get_io_u_index;
175 ccd->client_data = cd;
176
177 return 0;
178
179err_cleanup_common:
180 librpma_fio_client_cleanup(td);
181
182err_cfg_delete:
183 (void) rpma_conn_cfg_delete(&cfg);
184
185err_free_cd:
186 free(cd);
187
188 return -1;
189}
190
191static int client_post_init(struct thread_data *td)
192{
193 struct librpma_fio_client_data *ccd = td->io_ops_data;
194 struct client_data *cd = ccd->client_data;
195 unsigned int io_us_msgs_size;
196 int ret;
197
198 /* message buffers initialization and registration */
199 io_us_msgs_size = cd->msg_num * IO_U_BUF_LEN;
200 if ((ret = posix_memalign((void **)&cd->io_us_msgs, page_size,
201 io_us_msgs_size))) {
202 td_verror(td, ret, "posix_memalign");
203 return ret;
204 }
205 if ((ret = rpma_mr_reg(ccd->peer, cd->io_us_msgs, io_us_msgs_size,
206 RPMA_MR_USAGE_SEND | RPMA_MR_USAGE_RECV,
207 &cd->msg_mr))) {
208 librpma_td_verror(td, ret, "rpma_mr_reg");
209 return ret;
210 }
211
212 return librpma_fio_client_post_init(td);
213}
214
215static void client_cleanup(struct thread_data *td)
216{
217 struct librpma_fio_client_data *ccd = td->io_ops_data;
218 struct client_data *cd;
219 size_t flush_req_size;
220 size_t io_u_buf_off;
221 size_t send_offset;
222 void *send_ptr;
223 int ret;
224
225 if (ccd == NULL)
226 return;
227
228 cd = ccd->client_data;
229 if (cd == NULL) {
230 librpma_fio_client_cleanup(td);
231 return;
232 }
233
234 /*
235 * Make sure all SEND completions are collected ergo there are free
236 * slots in the SQ for the last SEND message.
237 *
238 * Note: If any operation will fail we still can send the termination
239 * notice.
240 */
241 (void) librpma_fio_client_io_complete_all_sends(td);
242
243 /* prepare the last flush message and pack it to the send buffer */
244 flush_req_size = gpspm_flush_request__get_packed_size(&Flush_req_last);
245 if (flush_req_size > MAX_MSG_SIZE) {
246 log_err(
247 "Packed flush request size is bigger than available send buffer space (%zu > %d\n",
248 flush_req_size, MAX_MSG_SIZE);
249 } else {
250 io_u_buf_off = IO_U_NEXT_BUF_OFF_CLIENT(cd);
251 send_offset = io_u_buf_off + SEND_OFFSET;
252 send_ptr = cd->io_us_msgs + send_offset;
253 (void) gpspm_flush_request__pack(&Flush_req_last, send_ptr);
254
255 /* send the flush message */
256 if ((ret = rpma_send(ccd->conn, cd->msg_mr, send_offset,
257 flush_req_size, RPMA_F_COMPLETION_ALWAYS,
258 NULL)))
259 librpma_td_verror(td, ret, "rpma_send");
260
261 ++ccd->op_send_posted;
262
263 /* Wait for the SEND to complete */
264 (void) librpma_fio_client_io_complete_all_sends(td);
265 }
266
267 /* deregister the messaging buffer memory */
268 if ((ret = rpma_mr_dereg(&cd->msg_mr)))
269 librpma_td_verror(td, ret, "rpma_mr_dereg");
270
271 free(ccd->client_data);
272
273 librpma_fio_client_cleanup(td);
274}
275
276static inline int client_io_flush(struct thread_data *td,
277 struct io_u *first_io_u, struct io_u *last_io_u,
278 unsigned long long int len)
279{
280 struct librpma_fio_client_data *ccd = td->io_ops_data;
281 struct client_data *cd = ccd->client_data;
282 size_t io_u_buf_off = IO_U_NEXT_BUF_OFF_CLIENT(cd);
283 size_t send_offset = io_u_buf_off + SEND_OFFSET;
284 size_t recv_offset = io_u_buf_off + RECV_OFFSET;
285 void *send_ptr = cd->io_us_msgs + send_offset;
286 void *recv_ptr = cd->io_us_msgs + recv_offset;
287 GPSPMFlushRequest flush_req = GPSPM_FLUSH_REQUEST__INIT;
288 size_t flush_req_size = 0;
289 int ret;
290
291 /* prepare a response buffer */
292 if ((ret = rpma_recv(ccd->conn, cd->msg_mr, recv_offset, MAX_MSG_SIZE,
293 recv_ptr))) {
294 librpma_td_verror(td, ret, "rpma_recv");
295 return -1;
296 }
297
298 /* prepare a flush message and pack it to a send buffer */
299 flush_req.offset = first_io_u->offset;
300 flush_req.length = len;
301 flush_req.op_context = last_io_u->index;
302 flush_req_size = gpspm_flush_request__get_packed_size(&flush_req);
303 if (flush_req_size > MAX_MSG_SIZE) {
304 log_err(
305 "Packed flush request size is bigger than available send buffer space (%"
306 PRIu64 " > %d\n", flush_req_size, MAX_MSG_SIZE);
307 return -1;
308 }
309 (void) gpspm_flush_request__pack(&flush_req, send_ptr);
310
311 /* send the flush message */
312 if ((ret = rpma_send(ccd->conn, cd->msg_mr, send_offset, flush_req_size,
313 RPMA_F_COMPLETION_ALWAYS, NULL))) {
314 librpma_td_verror(td, ret, "rpma_send");
315 return -1;
316 }
317
318 ++ccd->op_send_posted;
319
320 return 0;
321}
322
4ef7dd21 323static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index)
e4c4625f
JM
324{
325 GPSPMFlushResponse *flush_resp;
326
4ef7dd21 327 if (wc->opcode != IBV_WC_RECV)
e4c4625f
JM
328 return 0;
329
330 /* unpack a response from the received buffer */
331 flush_resp = gpspm_flush_response__unpack(NULL,
4ef7dd21 332 wc->byte_len, (void *)wc->wr_id);
e4c4625f
JM
333 if (flush_resp == NULL) {
334 log_err("Cannot unpack the flush response buffer\n");
335 return -1;
336 }
337
338 memcpy(io_u_index, &flush_resp->op_context, sizeof(*io_u_index));
339
340 gpspm_flush_response__free_unpacked(flush_resp, NULL);
341
342 return 1;
343}
344
345FIO_STATIC struct ioengine_ops ioengine_client = {
346 .name = "librpma_gpspm_client",
347 .version = FIO_IOOPS_VERSION,
348 .init = client_init,
349 .post_init = client_post_init,
350 .get_file_size = librpma_fio_client_get_file_size,
351 .open_file = librpma_fio_file_nop,
352 .queue = librpma_fio_client_queue,
353 .commit = librpma_fio_client_commit,
354 .getevents = librpma_fio_client_getevents,
355 .event = librpma_fio_client_event,
356 .errdetails = librpma_fio_client_errdetails,
357 .close_file = librpma_fio_file_nop,
358 .cleanup = client_cleanup,
2b82135e 359 .flags = FIO_DISKLESSIO | FIO_ASYNCIO_SETS_ISSUE_TIME,
e4c4625f
JM
360 .options = librpma_fio_options,
361 .option_struct_size = sizeof(struct librpma_fio_options_values),
362};
363
364/* server side implementation */
365
366#define IO_U_BUFF_OFF_SERVER(i) (i * IO_U_BUF_LEN)
367
fb04caf1
KS
368typedef void (*librpma_fio_persist_fn)(const void *ptr, size_t size);
369
e4c4625f
JM
370struct server_data {
371 /* aligned td->orig_buffer */
372 char *orig_buffer_aligned;
373
374 /* resources for messaging buffer from DRAM allocated by fio */
375 struct rpma_mr_local *msg_mr;
376
377 uint32_t msg_sqe_available; /* # of free SQ slots */
378
379 /* in-memory queues */
4ef7dd21 380 struct ibv_wc *msgs_queued;
e4c4625f 381 uint32_t msg_queued_nr;
fb04caf1
KS
382
383 librpma_fio_persist_fn persist;
e4c4625f
JM
384};
385
386static int server_init(struct thread_data *td)
387{
388 struct librpma_fio_server_data *csd;
389 struct server_data *sd;
390 int ret = -1;
391
392 if ((ret = librpma_fio_server_init(td)))
393 return ret;
394
395 csd = td->io_ops_data;
396
397 /* allocate server's data */
398 sd = calloc(1, sizeof(*sd));
399 if (sd == NULL) {
400 td_verror(td, errno, "calloc");
401 goto err_server_cleanup;
402 }
403
404 /* allocate in-memory queue */
405 sd->msgs_queued = calloc(td->o.iodepth, sizeof(*sd->msgs_queued));
406 if (sd->msgs_queued == NULL) {
407 td_verror(td, errno, "calloc");
408 goto err_free_sd;
409 }
410
fb04caf1
KS
411#ifdef CONFIG_LIBPMEM2_INSTALLED
412 /* get libpmem2 persist function from pmem2_map */
413 sd->persist = pmem2_get_persist_fn(csd->mem.map);
414#else
415 sd->persist = pmem_persist;
416#endif
417
e4c4625f
JM
418 /*
419 * Assure a single io_u buffer can store both SEND and RECV messages and
420 * an io_us buffer allocation is page-size-aligned which is required
421 * to register for RDMA. User-provided values are intentionally ignored.
422 */
423 td->o.max_bs[DDIR_READ] = IO_U_BUF_LEN;
424 td->o.mem_align = page_size;
425
426 csd->server_data = sd;
427
428 return 0;
429
430err_free_sd:
431 free(sd);
432
433err_server_cleanup:
434 librpma_fio_server_cleanup(td);
435
436 return -1;
437}
438
439static int server_post_init(struct thread_data *td)
440{
441 struct librpma_fio_server_data *csd = td->io_ops_data;
442 struct server_data *sd = csd->server_data;
443 size_t io_us_size;
444 size_t io_u_buflen;
445 int ret;
446
447 /*
448 * td->orig_buffer is not aligned. The engine requires aligned io_us
fc002f14 449 * so FIO aligns up the address using the formula below.
e4c4625f
JM
450 */
451 sd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
452 td->o.mem_align;
453
454 /*
455 * XXX
456 * Each io_u message buffer contains recv and send messages.
457 * Aligning each of those buffers may potentially give
458 * some performance benefits.
459 */
460 io_u_buflen = td_max_bs(td);
461
462 /* check whether io_u buffer is big enough */
463 if (io_u_buflen < IO_U_BUF_LEN) {
464 log_err(
465 "blocksize too small to accommodate assumed maximal request/response pair size (%" PRIu64 " < %d)\n",
466 io_u_buflen, IO_U_BUF_LEN);
467 return -1;
468 }
469
470 /*
471 * td->orig_buffer_size beside the space really consumed by io_us
472 * has paddings which can be omitted for the memory registration.
473 */
474 io_us_size = (unsigned long long)io_u_buflen *
475 (unsigned long long)td->o.iodepth;
476
477 if ((ret = rpma_mr_reg(csd->peer, sd->orig_buffer_aligned, io_us_size,
478 RPMA_MR_USAGE_SEND | RPMA_MR_USAGE_RECV,
479 &sd->msg_mr))) {
480 librpma_td_verror(td, ret, "rpma_mr_reg");
481 return -1;
482 }
483
484 return 0;
485}
486
487static void server_cleanup(struct thread_data *td)
488{
489 struct librpma_fio_server_data *csd = td->io_ops_data;
490 struct server_data *sd;
491 int ret;
492
493 if (csd == NULL)
494 return;
495
496 sd = csd->server_data;
497
498 if (sd != NULL) {
499 /* rpma_mr_dereg(messaging buffer from DRAM) */
500 if ((ret = rpma_mr_dereg(&sd->msg_mr)))
501 librpma_td_verror(td, ret, "rpma_mr_dereg");
502
503 free(sd->msgs_queued);
504 free(sd);
505 }
506
507 librpma_fio_server_cleanup(td);
508}
509
510static int prepare_connection(struct thread_data *td,
511 struct rpma_conn_req *conn_req)
512{
513 struct librpma_fio_server_data *csd = td->io_ops_data;
514 struct server_data *sd = csd->server_data;
515 int ret;
516 int i;
517
518 /* prepare buffers for a flush requests */
519 sd->msg_sqe_available = td->o.iodepth;
520 for (i = 0; i < td->o.iodepth; i++) {
521 size_t offset_recv_msg = IO_U_BUFF_OFF_SERVER(i) + RECV_OFFSET;
522 if ((ret = rpma_conn_req_recv(conn_req, sd->msg_mr,
523 offset_recv_msg, MAX_MSG_SIZE,
524 (const void *)(uintptr_t)i))) {
525 librpma_td_verror(td, ret, "rpma_conn_req_recv");
526 return ret;
527 }
528 }
529
530 return 0;
531}
532
533static int server_open_file(struct thread_data *td, struct fio_file *f)
534{
535 struct librpma_fio_server_data *csd = td->io_ops_data;
536 struct rpma_conn_cfg *cfg = NULL;
537 uint16_t max_msg_num = td->o.iodepth;
538 int ret;
539
540 csd->prepare_connection = prepare_connection;
541
542 /* create a connection configuration object */
543 if ((ret = rpma_conn_cfg_new(&cfg))) {
544 librpma_td_verror(td, ret, "rpma_conn_cfg_new");
545 return -1;
546 }
547
548 /*
549 * Calculate the required queue sizes where:
550 * - the send queue (SQ) has to be big enough to accommodate
551 * all possible flush requests (SENDs)
552 * - the receive queue (RQ) has to be big enough to accommodate
553 * all flush responses (RECVs)
554 * - the completion queue (CQ) has to be big enough to accommodate
555 * all success and error completions (sq_size + rq_size)
556 */
557 if ((ret = rpma_conn_cfg_set_sq_size(cfg, max_msg_num))) {
558 librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
559 goto err_cfg_delete;
560 }
561 if ((ret = rpma_conn_cfg_set_rq_size(cfg, max_msg_num))) {
562 librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size");
563 goto err_cfg_delete;
564 }
565 if ((ret = rpma_conn_cfg_set_cq_size(cfg, max_msg_num * 2))) {
566 librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
567 goto err_cfg_delete;
568 }
569
570 ret = librpma_fio_server_open_file(td, f, cfg);
571
572err_cfg_delete:
573 (void) rpma_conn_cfg_delete(&cfg);
574
575 return ret;
576}
577
4ef7dd21 578static int server_qe_process(struct thread_data *td, struct ibv_wc *wc)
e4c4625f
JM
579{
580 struct librpma_fio_server_data *csd = td->io_ops_data;
581 struct server_data *sd = csd->server_data;
582 GPSPMFlushRequest *flush_req;
583 GPSPMFlushResponse flush_resp = GPSPM_FLUSH_RESPONSE__INIT;
584 size_t flush_resp_size = 0;
585 size_t send_buff_offset;
586 size_t recv_buff_offset;
587 size_t io_u_buff_offset;
588 void *send_buff_ptr;
589 void *recv_buff_ptr;
590 void *op_ptr;
591 int msg_index;
592 int ret;
593
594 /* calculate SEND/RECV pair parameters */
4ef7dd21 595 msg_index = (int)(uintptr_t)wc->wr_id;
e4c4625f
JM
596 io_u_buff_offset = IO_U_BUFF_OFF_SERVER(msg_index);
597 send_buff_offset = io_u_buff_offset + SEND_OFFSET;
598 recv_buff_offset = io_u_buff_offset + RECV_OFFSET;
599 send_buff_ptr = sd->orig_buffer_aligned + send_buff_offset;
600 recv_buff_ptr = sd->orig_buffer_aligned + recv_buff_offset;
601
602 /* unpack a flush request from the received buffer */
4ef7dd21 603 flush_req = gpspm_flush_request__unpack(NULL, wc->byte_len,
e4c4625f
JM
604 recv_buff_ptr);
605 if (flush_req == NULL) {
606 log_err("cannot unpack the flush request buffer\n");
607 goto err_terminate;
608 }
609
610 if (IS_NOT_THE_LAST_MESSAGE(flush_req)) {
611 op_ptr = csd->ws_ptr + flush_req->offset;
fb04caf1 612 sd->persist(op_ptr, flush_req->length);
e4c4625f
JM
613 } else {
614 /*
615 * This is the last message - the client is done.
616 */
617 gpspm_flush_request__free_unpacked(flush_req, NULL);
618 td->done = true;
619 return 0;
620 }
621
622 /* initiate the next receive operation */
623 if ((ret = rpma_recv(csd->conn, sd->msg_mr, recv_buff_offset,
624 MAX_MSG_SIZE,
625 (const void *)(uintptr_t)msg_index))) {
626 librpma_td_verror(td, ret, "rpma_recv");
627 goto err_free_unpacked;
628 }
629
630 /* prepare a flush response and pack it to a send buffer */
631 flush_resp.op_context = flush_req->op_context;
632 flush_resp_size = gpspm_flush_response__get_packed_size(&flush_resp);
633 if (flush_resp_size > MAX_MSG_SIZE) {
634 log_err(
635 "Size of the packed flush response is bigger than the available space of the send buffer (%"
636 PRIu64 " > %i\n", flush_resp_size, MAX_MSG_SIZE);
637 goto err_free_unpacked;
638 }
639
640 (void) gpspm_flush_response__pack(&flush_resp, send_buff_ptr);
641
642 /* send the flush response */
643 if ((ret = rpma_send(csd->conn, sd->msg_mr, send_buff_offset,
644 flush_resp_size, RPMA_F_COMPLETION_ALWAYS, NULL))) {
645 librpma_td_verror(td, ret, "rpma_send");
646 goto err_free_unpacked;
647 }
648 --sd->msg_sqe_available;
649
650 gpspm_flush_request__free_unpacked(flush_req, NULL);
651
652 return 0;
653
654err_free_unpacked:
655 gpspm_flush_request__free_unpacked(flush_req, NULL);
656
657err_terminate:
658 td->terminate = true;
659
660 return -1;
661}
662
663static inline int server_queue_process(struct thread_data *td)
664{
665 struct librpma_fio_server_data *csd = td->io_ops_data;
666 struct server_data *sd = csd->server_data;
667 int ret;
668 int i;
669
670 /* min(# of queue entries, # of SQ entries available) */
671 uint32_t qes_to_process = min(sd->msg_queued_nr, sd->msg_sqe_available);
672 if (qes_to_process == 0)
673 return 0;
674
675 /* process queued completions */
676 for (i = 0; i < qes_to_process; ++i) {
677 if ((ret = server_qe_process(td, &sd->msgs_queued[i])))
678 return ret;
679 }
680
681 /* progress the queue */
682 for (i = 0; i < sd->msg_queued_nr - qes_to_process; ++i) {
683 memcpy(&sd->msgs_queued[i],
684 &sd->msgs_queued[qes_to_process + i],
685 sizeof(sd->msgs_queued[i]));
686 }
687
688 sd->msg_queued_nr -= qes_to_process;
689
690 return 0;
691}
692
693static int server_cmpl_process(struct thread_data *td)
694{
695 struct librpma_fio_server_data *csd = td->io_ops_data;
696 struct server_data *sd = csd->server_data;
4ef7dd21 697 struct ibv_wc *wc = &sd->msgs_queued[sd->msg_queued_nr];
6a229978 698 struct librpma_fio_options_values *o = td->eo;
e4c4625f
JM
699 int ret;
700
4ef7dd21 701 ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL);
e4c4625f 702 if (ret == RPMA_E_NO_COMPLETION) {
d3061c18
LD
703 if (o->busy_wait_polling)
704 return 0; /* lack of completion is not an error */
705
706 ret = rpma_cq_wait(csd->cq);
707 if (ret == RPMA_E_NO_COMPLETION)
708 return 0; /* lack of completion is not an error */
709 if (ret) {
710 librpma_td_verror(td, ret, "rpma_cq_wait");
711 goto err_terminate;
712 }
713
714 ret = rpma_cq_get_wc(csd->cq, 1, wc, NULL);
715 if (ret == RPMA_E_NO_COMPLETION)
716 return 0; /* lack of completion is not an error */
717 if (ret) {
718 librpma_td_verror(td, ret, "rpma_cq_get_wc");
719 goto err_terminate;
6a229978 720 }
d3061c18 721 } else if (ret) {
4ef7dd21 722 librpma_td_verror(td, ret, "rpma_cq_get_wc");
e4c4625f
JM
723 goto err_terminate;
724 }
725
726 /* validate the completion */
4ef7dd21 727 if (wc->status != IBV_WC_SUCCESS)
e4c4625f
JM
728 goto err_terminate;
729
4ef7dd21 730 if (wc->opcode == IBV_WC_RECV)
e4c4625f 731 ++sd->msg_queued_nr;
4ef7dd21 732 else if (wc->opcode == IBV_WC_SEND)
e4c4625f
JM
733 ++sd->msg_sqe_available;
734
735 return 0;
736
737err_terminate:
738 td->terminate = true;
739
740 return -1;
741}
742
743static enum fio_q_status server_queue(struct thread_data *td, struct io_u *io_u)
744{
745 do {
746 if (server_cmpl_process(td))
747 return FIO_Q_BUSY;
748
749 if (server_queue_process(td))
750 return FIO_Q_BUSY;
751
752 } while (!td->done);
753
754 return FIO_Q_COMPLETED;
755}
756
757FIO_STATIC struct ioengine_ops ioengine_server = {
758 .name = "librpma_gpspm_server",
759 .version = FIO_IOOPS_VERSION,
760 .init = server_init,
761 .post_init = server_post_init,
762 .open_file = server_open_file,
763 .close_file = librpma_fio_server_close_file,
764 .queue = server_queue,
765 .invalidate = librpma_fio_file_nop,
766 .cleanup = server_cleanup,
767 .flags = FIO_SYNCIO,
768 .options = librpma_fio_options,
769 .option_struct_size = sizeof(struct librpma_fio_options_values),
770};
771
772/* register both engines */
773
774static void fio_init fio_librpma_gpspm_register(void)
775{
776 register_ioengine(&ioengine_client);
777 register_ioengine(&ioengine_server);
778}
779
780static void fio_exit fio_librpma_gpspm_unregister(void)
781{
782 unregister_ioengine(&ioengine_client);
783 unregister_ioengine(&ioengine_server);
784}