stat: make add lat percentile functions inline
[fio.git] / engines / librpma_gpspm.c
CommitLineData
e4c4625f
JM
1/*
2 * librpma_gpspm: IO engine that uses PMDK librpma to write data,
3 * based on General Purpose Server Persistency Method
4 *
5 * Copyright 2020-2021, Intel Corporation
6 *
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License,
9 * version 2 as published by the Free Software Foundation..
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 */
16
17#include "librpma_fio.h"
18
19#include <libpmem.h>
20
21/* Generated by the protocol buffer compiler from: librpma_gpspm_flush.proto */
22#include "librpma_gpspm_flush.pb-c.h"
23
24#define MAX_MSG_SIZE (512)
25#define IO_U_BUF_LEN (2 * MAX_MSG_SIZE)
26#define SEND_OFFSET (0)
27#define RECV_OFFSET (SEND_OFFSET + MAX_MSG_SIZE)
28
29#define GPSPM_FLUSH_REQUEST__LAST \
30 { PROTOBUF_C_MESSAGE_INIT(&gpspm_flush_request__descriptor), 0, 0, 0 }
31
32/*
33 * 'Flush_req_last' is the last flush request
34 * the client has to send to server to indicate
35 * that the client is done.
36 */
37static const GPSPMFlushRequest Flush_req_last = GPSPM_FLUSH_REQUEST__LAST;
38
39#define IS_NOT_THE_LAST_MESSAGE(flush_req) \
40 (flush_req->length != Flush_req_last.length || \
41 flush_req->offset != Flush_req_last.offset)
42
43/* client side implementation */
44
45/* get next io_u message buffer in the round-robin fashion */
46#define IO_U_NEXT_BUF_OFF_CLIENT(cd) \
47 (IO_U_BUF_LEN * ((cd->msg_curr++) % cd->msg_num))
48
49struct client_data {
50 /* memory for sending and receiving buffered */
51 char *io_us_msgs;
52
53 /* resources for messaging buffer */
54 uint32_t msg_num;
55 uint32_t msg_curr;
56 struct rpma_mr_local *msg_mr;
57};
58
59static inline int client_io_flush(struct thread_data *td,
60 struct io_u *first_io_u, struct io_u *last_io_u,
61 unsigned long long int len);
62
63static int client_get_io_u_index(struct rpma_completion *cmpl,
64 unsigned int *io_u_index);
65
66static int client_init(struct thread_data *td)
67{
68 struct librpma_fio_client_data *ccd;
69 struct client_data *cd;
70 uint32_t write_num;
71 struct rpma_conn_cfg *cfg = NULL;
72 int ret;
73
74 /*
75 * not supported:
76 * - readwrite = read / trim / randread / randtrim /
77 * / rw / randrw / trimwrite
78 */
79 if (td_read(td) || td_trim(td)) {
80 td_verror(td, EINVAL, "Not supported mode.");
81 return -1;
82 }
83
84 /* allocate client's data */
85 cd = calloc(1, sizeof(*cd));
86 if (cd == NULL) {
87 td_verror(td, errno, "calloc");
88 return -1;
89 }
90
91 /*
92 * Calculate the required number of WRITEs and FLUSHes.
93 *
94 * Note: Each flush is a request (SEND) and response (RECV) pair.
95 */
96 if (td_random(td)) {
97 write_num = td->o.iodepth; /* WRITE * N */
98 cd->msg_num = td->o.iodepth; /* FLUSH * N */
99 } else {
100 if (td->o.sync_io) {
101 write_num = 1; /* WRITE */
102 cd->msg_num = 1; /* FLUSH */
103 } else {
104 write_num = td->o.iodepth; /* WRITE * N */
105 /*
106 * FLUSH * B where:
107 * - B == ceil(iodepth / iodepth_batch)
108 * which is the number of batches for N writes
109 */
110 cd->msg_num = LIBRPMA_FIO_CEIL(td->o.iodepth,
111 td->o.iodepth_batch);
112 }
113 }
114
115 /* create a connection configuration object */
116 if ((ret = rpma_conn_cfg_new(&cfg))) {
117 librpma_td_verror(td, ret, "rpma_conn_cfg_new");
118 goto err_free_cd;
119 }
120
121 /*
122 * Calculate the required queue sizes where:
123 * - the send queue (SQ) has to be big enough to accommodate
124 * all io_us (WRITEs) and all flush requests (SENDs)
125 * - the receive queue (RQ) has to be big enough to accommodate
126 * all flush responses (RECVs)
127 * - the completion queue (CQ) has to be big enough to accommodate all
128 * success and error completions (sq_size + rq_size)
129 */
130 if ((ret = rpma_conn_cfg_set_sq_size(cfg, write_num + cd->msg_num))) {
131 librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
132 goto err_cfg_delete;
133 }
134 if ((ret = rpma_conn_cfg_set_rq_size(cfg, cd->msg_num))) {
135 librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size");
136 goto err_cfg_delete;
137 }
138 if ((ret = rpma_conn_cfg_set_cq_size(cfg, write_num + cd->msg_num * 2))) {
139 librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
140 goto err_cfg_delete;
141 }
142
143 if (librpma_fio_client_init(td, cfg))
144 goto err_cfg_delete;
145
146 ccd = td->io_ops_data;
147
148 if (ccd->ws->direct_write_to_pmem &&
149 ccd->server_mr_flush_type == RPMA_FLUSH_TYPE_PERSISTENT &&
150 td->thread_number == 1) {
151 /* XXX log_info mixes with the JSON output */
152 log_err(
153 "Note: The server side supports Direct Write to PMem and it is equipped with PMem (direct_write_to_pmem).\n"
154 "You can use librpma_client and librpma_server engines for better performance instead of GPSPM.\n");
155 }
156
157 /* validate the server's RQ capacity */
158 if (cd->msg_num > ccd->ws->max_msg_num) {
159 log_err(
160 "server's RQ size (iodepth) too small to handle the client's workspace requirements (%u < %u)\n",
161 ccd->ws->max_msg_num, cd->msg_num);
162 goto err_cleanup_common;
163 }
164
165 if ((ret = rpma_conn_cfg_delete(&cfg))) {
166 librpma_td_verror(td, ret, "rpma_conn_cfg_delete");
167 /* non fatal error - continue */
168 }
169
170 ccd->flush = client_io_flush;
171 ccd->get_io_u_index = client_get_io_u_index;
172 ccd->client_data = cd;
173
174 return 0;
175
176err_cleanup_common:
177 librpma_fio_client_cleanup(td);
178
179err_cfg_delete:
180 (void) rpma_conn_cfg_delete(&cfg);
181
182err_free_cd:
183 free(cd);
184
185 return -1;
186}
187
188static int client_post_init(struct thread_data *td)
189{
190 struct librpma_fio_client_data *ccd = td->io_ops_data;
191 struct client_data *cd = ccd->client_data;
192 unsigned int io_us_msgs_size;
193 int ret;
194
195 /* message buffers initialization and registration */
196 io_us_msgs_size = cd->msg_num * IO_U_BUF_LEN;
197 if ((ret = posix_memalign((void **)&cd->io_us_msgs, page_size,
198 io_us_msgs_size))) {
199 td_verror(td, ret, "posix_memalign");
200 return ret;
201 }
202 if ((ret = rpma_mr_reg(ccd->peer, cd->io_us_msgs, io_us_msgs_size,
203 RPMA_MR_USAGE_SEND | RPMA_MR_USAGE_RECV,
204 &cd->msg_mr))) {
205 librpma_td_verror(td, ret, "rpma_mr_reg");
206 return ret;
207 }
208
209 return librpma_fio_client_post_init(td);
210}
211
212static void client_cleanup(struct thread_data *td)
213{
214 struct librpma_fio_client_data *ccd = td->io_ops_data;
215 struct client_data *cd;
216 size_t flush_req_size;
217 size_t io_u_buf_off;
218 size_t send_offset;
219 void *send_ptr;
220 int ret;
221
222 if (ccd == NULL)
223 return;
224
225 cd = ccd->client_data;
226 if (cd == NULL) {
227 librpma_fio_client_cleanup(td);
228 return;
229 }
230
231 /*
232 * Make sure all SEND completions are collected ergo there are free
233 * slots in the SQ for the last SEND message.
234 *
235 * Note: If any operation will fail we still can send the termination
236 * notice.
237 */
238 (void) librpma_fio_client_io_complete_all_sends(td);
239
240 /* prepare the last flush message and pack it to the send buffer */
241 flush_req_size = gpspm_flush_request__get_packed_size(&Flush_req_last);
242 if (flush_req_size > MAX_MSG_SIZE) {
243 log_err(
244 "Packed flush request size is bigger than available send buffer space (%zu > %d\n",
245 flush_req_size, MAX_MSG_SIZE);
246 } else {
247 io_u_buf_off = IO_U_NEXT_BUF_OFF_CLIENT(cd);
248 send_offset = io_u_buf_off + SEND_OFFSET;
249 send_ptr = cd->io_us_msgs + send_offset;
250 (void) gpspm_flush_request__pack(&Flush_req_last, send_ptr);
251
252 /* send the flush message */
253 if ((ret = rpma_send(ccd->conn, cd->msg_mr, send_offset,
254 flush_req_size, RPMA_F_COMPLETION_ALWAYS,
255 NULL)))
256 librpma_td_verror(td, ret, "rpma_send");
257
258 ++ccd->op_send_posted;
259
260 /* Wait for the SEND to complete */
261 (void) librpma_fio_client_io_complete_all_sends(td);
262 }
263
264 /* deregister the messaging buffer memory */
265 if ((ret = rpma_mr_dereg(&cd->msg_mr)))
266 librpma_td_verror(td, ret, "rpma_mr_dereg");
267
268 free(ccd->client_data);
269
270 librpma_fio_client_cleanup(td);
271}
272
273static inline int client_io_flush(struct thread_data *td,
274 struct io_u *first_io_u, struct io_u *last_io_u,
275 unsigned long long int len)
276{
277 struct librpma_fio_client_data *ccd = td->io_ops_data;
278 struct client_data *cd = ccd->client_data;
279 size_t io_u_buf_off = IO_U_NEXT_BUF_OFF_CLIENT(cd);
280 size_t send_offset = io_u_buf_off + SEND_OFFSET;
281 size_t recv_offset = io_u_buf_off + RECV_OFFSET;
282 void *send_ptr = cd->io_us_msgs + send_offset;
283 void *recv_ptr = cd->io_us_msgs + recv_offset;
284 GPSPMFlushRequest flush_req = GPSPM_FLUSH_REQUEST__INIT;
285 size_t flush_req_size = 0;
286 int ret;
287
288 /* prepare a response buffer */
289 if ((ret = rpma_recv(ccd->conn, cd->msg_mr, recv_offset, MAX_MSG_SIZE,
290 recv_ptr))) {
291 librpma_td_verror(td, ret, "rpma_recv");
292 return -1;
293 }
294
295 /* prepare a flush message and pack it to a send buffer */
296 flush_req.offset = first_io_u->offset;
297 flush_req.length = len;
298 flush_req.op_context = last_io_u->index;
299 flush_req_size = gpspm_flush_request__get_packed_size(&flush_req);
300 if (flush_req_size > MAX_MSG_SIZE) {
301 log_err(
302 "Packed flush request size is bigger than available send buffer space (%"
303 PRIu64 " > %d\n", flush_req_size, MAX_MSG_SIZE);
304 return -1;
305 }
306 (void) gpspm_flush_request__pack(&flush_req, send_ptr);
307
308 /* send the flush message */
309 if ((ret = rpma_send(ccd->conn, cd->msg_mr, send_offset, flush_req_size,
310 RPMA_F_COMPLETION_ALWAYS, NULL))) {
311 librpma_td_verror(td, ret, "rpma_send");
312 return -1;
313 }
314
315 ++ccd->op_send_posted;
316
317 return 0;
318}
319
320static int client_get_io_u_index(struct rpma_completion *cmpl,
321 unsigned int *io_u_index)
322{
323 GPSPMFlushResponse *flush_resp;
324
325 if (cmpl->op != RPMA_OP_RECV)
326 return 0;
327
328 /* unpack a response from the received buffer */
329 flush_resp = gpspm_flush_response__unpack(NULL,
330 cmpl->byte_len, cmpl->op_context);
331 if (flush_resp == NULL) {
332 log_err("Cannot unpack the flush response buffer\n");
333 return -1;
334 }
335
336 memcpy(io_u_index, &flush_resp->op_context, sizeof(*io_u_index));
337
338 gpspm_flush_response__free_unpacked(flush_resp, NULL);
339
340 return 1;
341}
342
343FIO_STATIC struct ioengine_ops ioengine_client = {
344 .name = "librpma_gpspm_client",
345 .version = FIO_IOOPS_VERSION,
346 .init = client_init,
347 .post_init = client_post_init,
348 .get_file_size = librpma_fio_client_get_file_size,
349 .open_file = librpma_fio_file_nop,
350 .queue = librpma_fio_client_queue,
351 .commit = librpma_fio_client_commit,
352 .getevents = librpma_fio_client_getevents,
353 .event = librpma_fio_client_event,
354 .errdetails = librpma_fio_client_errdetails,
355 .close_file = librpma_fio_file_nop,
356 .cleanup = client_cleanup,
357 .flags = FIO_DISKLESSIO,
358 .options = librpma_fio_options,
359 .option_struct_size = sizeof(struct librpma_fio_options_values),
360};
361
362/* server side implementation */
363
364#define IO_U_BUFF_OFF_SERVER(i) (i * IO_U_BUF_LEN)
365
366struct server_data {
367 /* aligned td->orig_buffer */
368 char *orig_buffer_aligned;
369
370 /* resources for messaging buffer from DRAM allocated by fio */
371 struct rpma_mr_local *msg_mr;
372
373 uint32_t msg_sqe_available; /* # of free SQ slots */
374
375 /* in-memory queues */
376 struct rpma_completion *msgs_queued;
377 uint32_t msg_queued_nr;
378};
379
380static int server_init(struct thread_data *td)
381{
382 struct librpma_fio_server_data *csd;
383 struct server_data *sd;
384 int ret = -1;
385
386 if ((ret = librpma_fio_server_init(td)))
387 return ret;
388
389 csd = td->io_ops_data;
390
391 /* allocate server's data */
392 sd = calloc(1, sizeof(*sd));
393 if (sd == NULL) {
394 td_verror(td, errno, "calloc");
395 goto err_server_cleanup;
396 }
397
398 /* allocate in-memory queue */
399 sd->msgs_queued = calloc(td->o.iodepth, sizeof(*sd->msgs_queued));
400 if (sd->msgs_queued == NULL) {
401 td_verror(td, errno, "calloc");
402 goto err_free_sd;
403 }
404
405 /*
406 * Assure a single io_u buffer can store both SEND and RECV messages and
407 * an io_us buffer allocation is page-size-aligned which is required
408 * to register for RDMA. User-provided values are intentionally ignored.
409 */
410 td->o.max_bs[DDIR_READ] = IO_U_BUF_LEN;
411 td->o.mem_align = page_size;
412
413 csd->server_data = sd;
414
415 return 0;
416
417err_free_sd:
418 free(sd);
419
420err_server_cleanup:
421 librpma_fio_server_cleanup(td);
422
423 return -1;
424}
425
426static int server_post_init(struct thread_data *td)
427{
428 struct librpma_fio_server_data *csd = td->io_ops_data;
429 struct server_data *sd = csd->server_data;
430 size_t io_us_size;
431 size_t io_u_buflen;
432 int ret;
433
434 /*
435 * td->orig_buffer is not aligned. The engine requires aligned io_us
436 * so FIO alignes up the address using the formula below.
437 */
438 sd->orig_buffer_aligned = PTR_ALIGN(td->orig_buffer, page_mask) +
439 td->o.mem_align;
440
441 /*
442 * XXX
443 * Each io_u message buffer contains recv and send messages.
444 * Aligning each of those buffers may potentially give
445 * some performance benefits.
446 */
447 io_u_buflen = td_max_bs(td);
448
449 /* check whether io_u buffer is big enough */
450 if (io_u_buflen < IO_U_BUF_LEN) {
451 log_err(
452 "blocksize too small to accommodate assumed maximal request/response pair size (%" PRIu64 " < %d)\n",
453 io_u_buflen, IO_U_BUF_LEN);
454 return -1;
455 }
456
457 /*
458 * td->orig_buffer_size beside the space really consumed by io_us
459 * has paddings which can be omitted for the memory registration.
460 */
461 io_us_size = (unsigned long long)io_u_buflen *
462 (unsigned long long)td->o.iodepth;
463
464 if ((ret = rpma_mr_reg(csd->peer, sd->orig_buffer_aligned, io_us_size,
465 RPMA_MR_USAGE_SEND | RPMA_MR_USAGE_RECV,
466 &sd->msg_mr))) {
467 librpma_td_verror(td, ret, "rpma_mr_reg");
468 return -1;
469 }
470
471 return 0;
472}
473
474static void server_cleanup(struct thread_data *td)
475{
476 struct librpma_fio_server_data *csd = td->io_ops_data;
477 struct server_data *sd;
478 int ret;
479
480 if (csd == NULL)
481 return;
482
483 sd = csd->server_data;
484
485 if (sd != NULL) {
486 /* rpma_mr_dereg(messaging buffer from DRAM) */
487 if ((ret = rpma_mr_dereg(&sd->msg_mr)))
488 librpma_td_verror(td, ret, "rpma_mr_dereg");
489
490 free(sd->msgs_queued);
491 free(sd);
492 }
493
494 librpma_fio_server_cleanup(td);
495}
496
497static int prepare_connection(struct thread_data *td,
498 struct rpma_conn_req *conn_req)
499{
500 struct librpma_fio_server_data *csd = td->io_ops_data;
501 struct server_data *sd = csd->server_data;
502 int ret;
503 int i;
504
505 /* prepare buffers for a flush requests */
506 sd->msg_sqe_available = td->o.iodepth;
507 for (i = 0; i < td->o.iodepth; i++) {
508 size_t offset_recv_msg = IO_U_BUFF_OFF_SERVER(i) + RECV_OFFSET;
509 if ((ret = rpma_conn_req_recv(conn_req, sd->msg_mr,
510 offset_recv_msg, MAX_MSG_SIZE,
511 (const void *)(uintptr_t)i))) {
512 librpma_td_verror(td, ret, "rpma_conn_req_recv");
513 return ret;
514 }
515 }
516
517 return 0;
518}
519
520static int server_open_file(struct thread_data *td, struct fio_file *f)
521{
522 struct librpma_fio_server_data *csd = td->io_ops_data;
523 struct rpma_conn_cfg *cfg = NULL;
524 uint16_t max_msg_num = td->o.iodepth;
525 int ret;
526
527 csd->prepare_connection = prepare_connection;
528
529 /* create a connection configuration object */
530 if ((ret = rpma_conn_cfg_new(&cfg))) {
531 librpma_td_verror(td, ret, "rpma_conn_cfg_new");
532 return -1;
533 }
534
535 /*
536 * Calculate the required queue sizes where:
537 * - the send queue (SQ) has to be big enough to accommodate
538 * all possible flush requests (SENDs)
539 * - the receive queue (RQ) has to be big enough to accommodate
540 * all flush responses (RECVs)
541 * - the completion queue (CQ) has to be big enough to accommodate
542 * all success and error completions (sq_size + rq_size)
543 */
544 if ((ret = rpma_conn_cfg_set_sq_size(cfg, max_msg_num))) {
545 librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
546 goto err_cfg_delete;
547 }
548 if ((ret = rpma_conn_cfg_set_rq_size(cfg, max_msg_num))) {
549 librpma_td_verror(td, ret, "rpma_conn_cfg_set_rq_size");
550 goto err_cfg_delete;
551 }
552 if ((ret = rpma_conn_cfg_set_cq_size(cfg, max_msg_num * 2))) {
553 librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
554 goto err_cfg_delete;
555 }
556
557 ret = librpma_fio_server_open_file(td, f, cfg);
558
559err_cfg_delete:
560 (void) rpma_conn_cfg_delete(&cfg);
561
562 return ret;
563}
564
565static int server_qe_process(struct thread_data *td,
566 struct rpma_completion *cmpl)
567{
568 struct librpma_fio_server_data *csd = td->io_ops_data;
569 struct server_data *sd = csd->server_data;
570 GPSPMFlushRequest *flush_req;
571 GPSPMFlushResponse flush_resp = GPSPM_FLUSH_RESPONSE__INIT;
572 size_t flush_resp_size = 0;
573 size_t send_buff_offset;
574 size_t recv_buff_offset;
575 size_t io_u_buff_offset;
576 void *send_buff_ptr;
577 void *recv_buff_ptr;
578 void *op_ptr;
579 int msg_index;
580 int ret;
581
582 /* calculate SEND/RECV pair parameters */
583 msg_index = (int)(uintptr_t)cmpl->op_context;
584 io_u_buff_offset = IO_U_BUFF_OFF_SERVER(msg_index);
585 send_buff_offset = io_u_buff_offset + SEND_OFFSET;
586 recv_buff_offset = io_u_buff_offset + RECV_OFFSET;
587 send_buff_ptr = sd->orig_buffer_aligned + send_buff_offset;
588 recv_buff_ptr = sd->orig_buffer_aligned + recv_buff_offset;
589
590 /* unpack a flush request from the received buffer */
591 flush_req = gpspm_flush_request__unpack(NULL, cmpl->byte_len,
592 recv_buff_ptr);
593 if (flush_req == NULL) {
594 log_err("cannot unpack the flush request buffer\n");
595 goto err_terminate;
596 }
597
598 if (IS_NOT_THE_LAST_MESSAGE(flush_req)) {
599 op_ptr = csd->ws_ptr + flush_req->offset;
600 pmem_persist(op_ptr, flush_req->length);
601 } else {
602 /*
603 * This is the last message - the client is done.
604 */
605 gpspm_flush_request__free_unpacked(flush_req, NULL);
606 td->done = true;
607 return 0;
608 }
609
610 /* initiate the next receive operation */
611 if ((ret = rpma_recv(csd->conn, sd->msg_mr, recv_buff_offset,
612 MAX_MSG_SIZE,
613 (const void *)(uintptr_t)msg_index))) {
614 librpma_td_verror(td, ret, "rpma_recv");
615 goto err_free_unpacked;
616 }
617
618 /* prepare a flush response and pack it to a send buffer */
619 flush_resp.op_context = flush_req->op_context;
620 flush_resp_size = gpspm_flush_response__get_packed_size(&flush_resp);
621 if (flush_resp_size > MAX_MSG_SIZE) {
622 log_err(
623 "Size of the packed flush response is bigger than the available space of the send buffer (%"
624 PRIu64 " > %i\n", flush_resp_size, MAX_MSG_SIZE);
625 goto err_free_unpacked;
626 }
627
628 (void) gpspm_flush_response__pack(&flush_resp, send_buff_ptr);
629
630 /* send the flush response */
631 if ((ret = rpma_send(csd->conn, sd->msg_mr, send_buff_offset,
632 flush_resp_size, RPMA_F_COMPLETION_ALWAYS, NULL))) {
633 librpma_td_verror(td, ret, "rpma_send");
634 goto err_free_unpacked;
635 }
636 --sd->msg_sqe_available;
637
638 gpspm_flush_request__free_unpacked(flush_req, NULL);
639
640 return 0;
641
642err_free_unpacked:
643 gpspm_flush_request__free_unpacked(flush_req, NULL);
644
645err_terminate:
646 td->terminate = true;
647
648 return -1;
649}
650
651static inline int server_queue_process(struct thread_data *td)
652{
653 struct librpma_fio_server_data *csd = td->io_ops_data;
654 struct server_data *sd = csd->server_data;
655 int ret;
656 int i;
657
658 /* min(# of queue entries, # of SQ entries available) */
659 uint32_t qes_to_process = min(sd->msg_queued_nr, sd->msg_sqe_available);
660 if (qes_to_process == 0)
661 return 0;
662
663 /* process queued completions */
664 for (i = 0; i < qes_to_process; ++i) {
665 if ((ret = server_qe_process(td, &sd->msgs_queued[i])))
666 return ret;
667 }
668
669 /* progress the queue */
670 for (i = 0; i < sd->msg_queued_nr - qes_to_process; ++i) {
671 memcpy(&sd->msgs_queued[i],
672 &sd->msgs_queued[qes_to_process + i],
673 sizeof(sd->msgs_queued[i]));
674 }
675
676 sd->msg_queued_nr -= qes_to_process;
677
678 return 0;
679}
680
681static int server_cmpl_process(struct thread_data *td)
682{
683 struct librpma_fio_server_data *csd = td->io_ops_data;
684 struct server_data *sd = csd->server_data;
685 struct rpma_completion *cmpl = &sd->msgs_queued[sd->msg_queued_nr];
6a229978 686 struct librpma_fio_options_values *o = td->eo;
e4c4625f
JM
687 int ret;
688
689 ret = rpma_conn_completion_get(csd->conn, cmpl);
690 if (ret == RPMA_E_NO_COMPLETION) {
6a229978
OS
691 if (o->busy_wait_polling == 0) {
692 ret = rpma_conn_completion_wait(csd->conn);
693 if (ret == RPMA_E_NO_COMPLETION) {
694 /* lack of completion is not an error */
695 return 0;
696 } else if (ret != 0) {
697 librpma_td_verror(td, ret, "rpma_conn_completion_wait");
698 goto err_terminate;
699 }
700
701 ret = rpma_conn_completion_get(csd->conn, cmpl);
702 if (ret == RPMA_E_NO_COMPLETION) {
703 /* lack of completion is not an error */
704 return 0;
705 } else if (ret != 0) {
706 librpma_td_verror(td, ret, "rpma_conn_completion_get");
707 goto err_terminate;
708 }
709 } else {
710 /* lack of completion is not an error */
711 return 0;
712 }
e4c4625f
JM
713 } else if (ret != 0) {
714 librpma_td_verror(td, ret, "rpma_conn_completion_get");
715 goto err_terminate;
716 }
717
718 /* validate the completion */
719 if (cmpl->op_status != IBV_WC_SUCCESS)
720 goto err_terminate;
721
722 if (cmpl->op == RPMA_OP_RECV)
723 ++sd->msg_queued_nr;
724 else if (cmpl->op == RPMA_OP_SEND)
725 ++sd->msg_sqe_available;
726
727 return 0;
728
729err_terminate:
730 td->terminate = true;
731
732 return -1;
733}
734
735static enum fio_q_status server_queue(struct thread_data *td, struct io_u *io_u)
736{
737 do {
738 if (server_cmpl_process(td))
739 return FIO_Q_BUSY;
740
741 if (server_queue_process(td))
742 return FIO_Q_BUSY;
743
744 } while (!td->done);
745
746 return FIO_Q_COMPLETED;
747}
748
749FIO_STATIC struct ioengine_ops ioengine_server = {
750 .name = "librpma_gpspm_server",
751 .version = FIO_IOOPS_VERSION,
752 .init = server_init,
753 .post_init = server_post_init,
754 .open_file = server_open_file,
755 .close_file = librpma_fio_server_close_file,
756 .queue = server_queue,
757 .invalidate = librpma_fio_file_nop,
758 .cleanup = server_cleanup,
759 .flags = FIO_SYNCIO,
760 .options = librpma_fio_options,
761 .option_struct_size = sizeof(struct librpma_fio_options_values),
762};
763
764/* register both engines */
765
766static void fio_init fio_librpma_gpspm_register(void)
767{
768 register_ioengine(&ioengine_client);
769 register_ioengine(&ioengine_server);
770}
771
772static void fio_exit fio_librpma_gpspm_unregister(void)
773{
774 unregister_ioengine(&ioengine_client);
775 unregister_ioengine(&ioengine_server);
776}