2 * librpma_apm: IO engine that uses PMDK librpma to read and write data,
3 * based on Appliance Persistency Method
5 * Copyright 2020-2021, Intel Corporation
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License,
9 * version 2 as published by the Free Software Foundation..
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
17 #include "librpma_fio.h"
19 /* client side implementation */
21 static inline int client_io_flush(struct thread_data *td,
22 struct io_u *first_io_u, struct io_u *last_io_u,
23 unsigned long long int len);
25 static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index);
27 static int client_init(struct thread_data *td)
29 struct librpma_fio_client_data *ccd;
32 struct rpma_conn_cfg *cfg = NULL;
33 struct rpma_peer_cfg *pcfg = NULL;
36 /* not supported readwrite = trim / randtrim / trimwrite */
38 td_verror(td, EINVAL, "Not supported mode.");
43 * Calculate the required queue sizes where:
44 * - the send queue (SQ) has to be big enough to accommodate
45 * all io_us (WRITEs) and all flush requests (FLUSHes)
46 * - the completion queue (CQ) has to be big enough to accommodate all
47 * success and error completions (cq_size = sq_size)
49 if (td_random(td) || td_rw(td)) {
51 * sq_size = max(rand_read_sq_size, rand_write_sq_size)
52 * where rand_read_sq_size < rand_write_sq_size because read
53 * does not require flush afterwards
54 * rand_write_sq_size = N * (WRITE + FLUSH)
56 * Note: rw is no different from random write since having
57 * interleaved reads with writes in extreme forces you to flush
58 * as often as when the writes are random.
60 sq_size = 2 * td->o.iodepth;
61 } else if (td_write(td)) {
62 /* sequential TD_DDIR_WRITE only */
64 sq_size = 2; /* WRITE + FLUSH */
67 * N * WRITE + B * FLUSH where:
68 * - B == ceil(iodepth / iodepth_batch)
69 * which is the number of batches for N writes
71 sq_size = td->o.iodepth + LIBRPMA_FIO_CEIL(td->o.iodepth,
75 /* TD_DDIR_READ only */
77 sq_size = 1; /* READ */
79 sq_size = td->o.iodepth; /* N x READ */
84 /* create a connection configuration object */
85 if ((ret = rpma_conn_cfg_new(&cfg))) {
86 librpma_td_verror(td, ret, "rpma_conn_cfg_new");
90 /* apply queue sizes */
91 if ((ret = rpma_conn_cfg_set_sq_size(cfg, sq_size))) {
92 librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
95 if ((ret = rpma_conn_cfg_set_cq_size(cfg, cq_size))) {
96 librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
100 if (librpma_fio_client_init(td, cfg))
103 ccd = td->io_ops_data;
105 if (ccd->server_mr_flush_type == RPMA_FLUSH_TYPE_PERSISTENT) {
106 if (!ccd->ws->direct_write_to_pmem) {
107 if (td->thread_number == 1)
109 "Fio librpma engine will not work until the Direct Write to PMem on the server side is possible (direct_write_to_pmem)\n");
110 goto err_cleanup_common;
113 /* configure peer's direct write to pmem support */
114 if ((ret = rpma_peer_cfg_new(&pcfg))) {
115 librpma_td_verror(td, ret, "rpma_peer_cfg_new");
116 goto err_cleanup_common;
119 if ((ret = rpma_peer_cfg_set_direct_write_to_pmem(pcfg, true))) {
120 librpma_td_verror(td, ret,
121 "rpma_peer_cfg_set_direct_write_to_pmem");
122 (void) rpma_peer_cfg_delete(&pcfg);
123 goto err_cleanup_common;
126 if ((ret = rpma_conn_apply_remote_peer_cfg(ccd->conn, pcfg))) {
127 librpma_td_verror(td, ret,
128 "rpma_conn_apply_remote_peer_cfg");
129 (void) rpma_peer_cfg_delete(&pcfg);
130 goto err_cleanup_common;
133 (void) rpma_peer_cfg_delete(&pcfg);
134 } else if (td->thread_number == 1) {
135 /* XXX log_info mixes with the JSON output */
137 "Note: Direct Write to PMem is not supported by default nor required if you use DRAM instead of PMem on the server side (direct_write_to_pmem).\n"
138 "Remember that flushing to DRAM does not make your data persistent and may be used only for experimental purposes.\n");
141 if ((ret = rpma_conn_cfg_delete(&cfg))) {
142 librpma_td_verror(td, ret, "rpma_conn_cfg_delete");
143 /* non fatal error - continue */
146 ccd->flush = client_io_flush;
147 ccd->get_io_u_index = client_get_io_u_index;
152 librpma_fio_client_cleanup(td);
155 (void) rpma_conn_cfg_delete(&cfg);
160 static void client_cleanup(struct thread_data *td)
162 struct librpma_fio_client_data *ccd = td->io_ops_data;
167 free(ccd->client_data);
169 librpma_fio_client_cleanup(td);
172 static inline int client_io_flush(struct thread_data *td,
173 struct io_u *first_io_u, struct io_u *last_io_u,
174 unsigned long long int len)
176 struct librpma_fio_client_data *ccd = td->io_ops_data;
177 size_t dst_offset = first_io_u->offset;
180 if ((ret = rpma_flush(ccd->conn, ccd->server_mr, dst_offset, len,
181 ccd->server_mr_flush_type, RPMA_F_COMPLETION_ALWAYS,
182 (void *)(uintptr_t)last_io_u->index))) {
183 librpma_td_verror(td, ret, "rpma_flush");
190 static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index)
192 memcpy(io_u_index, &wc->wr_id, sizeof(*io_u_index));
197 FIO_STATIC struct ioengine_ops ioengine_client = {
198 .name = "librpma_apm_client",
199 .version = FIO_IOOPS_VERSION,
201 .post_init = librpma_fio_client_post_init,
202 .get_file_size = librpma_fio_client_get_file_size,
203 .open_file = librpma_fio_file_nop,
204 .queue = librpma_fio_client_queue,
205 .commit = librpma_fio_client_commit,
206 .getevents = librpma_fio_client_getevents,
207 .event = librpma_fio_client_event,
208 .errdetails = librpma_fio_client_errdetails,
209 .close_file = librpma_fio_file_nop,
210 .cleanup = client_cleanup,
211 .flags = FIO_DISKLESSIO | FIO_ASYNCIO_SETS_ISSUE_TIME,
212 .options = librpma_fio_options,
213 .option_struct_size = sizeof(struct librpma_fio_options_values),
216 /* server side implementation */
218 static int server_open_file(struct thread_data *td, struct fio_file *f)
220 return librpma_fio_server_open_file(td, f, NULL);
223 static enum fio_q_status server_queue(struct thread_data *td, struct io_u *io_u)
225 return FIO_Q_COMPLETED;
228 FIO_STATIC struct ioengine_ops ioengine_server = {
229 .name = "librpma_apm_server",
230 .version = FIO_IOOPS_VERSION,
231 .init = librpma_fio_server_init,
232 .open_file = server_open_file,
233 .close_file = librpma_fio_server_close_file,
234 .queue = server_queue,
235 .invalidate = librpma_fio_file_nop,
236 .cleanup = librpma_fio_server_cleanup,
238 .options = librpma_fio_options,
239 .option_struct_size = sizeof(struct librpma_fio_options_values),
242 /* register both engines */
244 static void fio_init fio_librpma_apm_register(void)
246 register_ioengine(&ioengine_client);
247 register_ioengine(&ioengine_server);
250 static void fio_exit fio_librpma_apm_unregister(void)
252 unregister_ioengine(&ioengine_client);
253 unregister_ioengine(&ioengine_server);