engines/libblkio: Add option libblkio_wait_mode
[fio.git] / engines / librpma_apm.c
CommitLineData
e4c4625f
JM
1/*
2* librpma_apm: IO engine that uses PMDK librpma to read and write data,
3 * based on Appliance Persistency Method
4 *
5 * Copyright 2020-2021, Intel Corporation
6 *
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License,
9 * version 2 as published by the Free Software Foundation..
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 */
16
17#include "librpma_fio.h"
18
19/* client side implementation */
20
21static inline int client_io_flush(struct thread_data *td,
22 struct io_u *first_io_u, struct io_u *last_io_u,
23 unsigned long long int len);
24
4ef7dd21 25static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index);
e4c4625f
JM
26
27static int client_init(struct thread_data *td)
28{
29 struct librpma_fio_client_data *ccd;
30 unsigned int sq_size;
31 uint32_t cq_size;
32 struct rpma_conn_cfg *cfg = NULL;
33 struct rpma_peer_cfg *pcfg = NULL;
34 int ret;
35
36 /* not supported readwrite = trim / randtrim / trimwrite */
37 if (td_trim(td)) {
38 td_verror(td, EINVAL, "Not supported mode.");
39 return -1;
40 }
41
42 /*
43 * Calculate the required queue sizes where:
44 * - the send queue (SQ) has to be big enough to accommodate
45 * all io_us (WRITEs) and all flush requests (FLUSHes)
46 * - the completion queue (CQ) has to be big enough to accommodate all
47 * success and error completions (cq_size = sq_size)
48 */
49 if (td_random(td) || td_rw(td)) {
50 /*
51 * sq_size = max(rand_read_sq_size, rand_write_sq_size)
52 * where rand_read_sq_size < rand_write_sq_size because read
53 * does not require flush afterwards
54 * rand_write_sq_size = N * (WRITE + FLUSH)
55 *
56 * Note: rw is no different from random write since having
57 * interleaved reads with writes in extreme forces you to flush
58 * as often as when the writes are random.
59 */
60 sq_size = 2 * td->o.iodepth;
61 } else if (td_write(td)) {
62 /* sequential TD_DDIR_WRITE only */
63 if (td->o.sync_io) {
64 sq_size = 2; /* WRITE + FLUSH */
65 } else {
66 /*
67 * N * WRITE + B * FLUSH where:
68 * - B == ceil(iodepth / iodepth_batch)
69 * which is the number of batches for N writes
70 */
71 sq_size = td->o.iodepth + LIBRPMA_FIO_CEIL(td->o.iodepth,
72 td->o.iodepth_batch);
73 }
74 } else {
75 /* TD_DDIR_READ only */
76 if (td->o.sync_io) {
77 sq_size = 1; /* READ */
78 } else {
79 sq_size = td->o.iodepth; /* N x READ */
80 }
81 }
82 cq_size = sq_size;
83
84 /* create a connection configuration object */
85 if ((ret = rpma_conn_cfg_new(&cfg))) {
86 librpma_td_verror(td, ret, "rpma_conn_cfg_new");
87 return -1;
88 }
89
90 /* apply queue sizes */
91 if ((ret = rpma_conn_cfg_set_sq_size(cfg, sq_size))) {
92 librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
93 goto err_cfg_delete;
94 }
95 if ((ret = rpma_conn_cfg_set_cq_size(cfg, cq_size))) {
96 librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
97 goto err_cfg_delete;
98 }
99
100 if (librpma_fio_client_init(td, cfg))
101 goto err_cfg_delete;
102
103 ccd = td->io_ops_data;
104
105 if (ccd->server_mr_flush_type == RPMA_FLUSH_TYPE_PERSISTENT) {
106 if (!ccd->ws->direct_write_to_pmem) {
107 if (td->thread_number == 1)
108 log_err(
109 "Fio librpma engine will not work until the Direct Write to PMem on the server side is possible (direct_write_to_pmem)\n");
110 goto err_cleanup_common;
111 }
112
113 /* configure peer's direct write to pmem support */
114 if ((ret = rpma_peer_cfg_new(&pcfg))) {
115 librpma_td_verror(td, ret, "rpma_peer_cfg_new");
116 goto err_cleanup_common;
117 }
118
119 if ((ret = rpma_peer_cfg_set_direct_write_to_pmem(pcfg, true))) {
120 librpma_td_verror(td, ret,
121 "rpma_peer_cfg_set_direct_write_to_pmem");
122 (void) rpma_peer_cfg_delete(&pcfg);
123 goto err_cleanup_common;
124 }
125
126 if ((ret = rpma_conn_apply_remote_peer_cfg(ccd->conn, pcfg))) {
127 librpma_td_verror(td, ret,
128 "rpma_conn_apply_remote_peer_cfg");
129 (void) rpma_peer_cfg_delete(&pcfg);
130 goto err_cleanup_common;
131 }
132
133 (void) rpma_peer_cfg_delete(&pcfg);
134 } else if (td->thread_number == 1) {
135 /* XXX log_info mixes with the JSON output */
136 log_err(
137 "Note: Direct Write to PMem is not supported by default nor required if you use DRAM instead of PMem on the server side (direct_write_to_pmem).\n"
138 "Remember that flushing to DRAM does not make your data persistent and may be used only for experimental purposes.\n");
139 }
140
141 if ((ret = rpma_conn_cfg_delete(&cfg))) {
142 librpma_td_verror(td, ret, "rpma_conn_cfg_delete");
143 /* non fatal error - continue */
144 }
145
146 ccd->flush = client_io_flush;
147 ccd->get_io_u_index = client_get_io_u_index;
148
149 return 0;
150
151err_cleanup_common:
152 librpma_fio_client_cleanup(td);
153
154err_cfg_delete:
155 (void) rpma_conn_cfg_delete(&cfg);
156
157 return -1;
158}
159
160static void client_cleanup(struct thread_data *td)
161{
162 struct librpma_fio_client_data *ccd = td->io_ops_data;
163
164 if (ccd == NULL)
165 return;
166
167 free(ccd->client_data);
168
169 librpma_fio_client_cleanup(td);
170}
171
172static inline int client_io_flush(struct thread_data *td,
173 struct io_u *first_io_u, struct io_u *last_io_u,
174 unsigned long long int len)
175{
176 struct librpma_fio_client_data *ccd = td->io_ops_data;
177 size_t dst_offset = first_io_u->offset;
178 int ret;
179
180 if ((ret = rpma_flush(ccd->conn, ccd->server_mr, dst_offset, len,
181 ccd->server_mr_flush_type, RPMA_F_COMPLETION_ALWAYS,
182 (void *)(uintptr_t)last_io_u->index))) {
183 librpma_td_verror(td, ret, "rpma_flush");
184 return -1;
185 }
186
187 return 0;
188}
189
4ef7dd21 190static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index)
e4c4625f 191{
4ef7dd21 192 memcpy(io_u_index, &wc->wr_id, sizeof(*io_u_index));
e4c4625f
JM
193
194 return 1;
195}
196
197FIO_STATIC struct ioengine_ops ioengine_client = {
198 .name = "librpma_apm_client",
199 .version = FIO_IOOPS_VERSION,
200 .init = client_init,
201 .post_init = librpma_fio_client_post_init,
202 .get_file_size = librpma_fio_client_get_file_size,
203 .open_file = librpma_fio_file_nop,
204 .queue = librpma_fio_client_queue,
205 .commit = librpma_fio_client_commit,
206 .getevents = librpma_fio_client_getevents,
207 .event = librpma_fio_client_event,
208 .errdetails = librpma_fio_client_errdetails,
209 .close_file = librpma_fio_file_nop,
210 .cleanup = client_cleanup,
2b82135e 211 .flags = FIO_DISKLESSIO | FIO_ASYNCIO_SETS_ISSUE_TIME,
e4c4625f
JM
212 .options = librpma_fio_options,
213 .option_struct_size = sizeof(struct librpma_fio_options_values),
214};
215
216/* server side implementation */
217
218static int server_open_file(struct thread_data *td, struct fio_file *f)
219{
220 return librpma_fio_server_open_file(td, f, NULL);
221}
222
223static enum fio_q_status server_queue(struct thread_data *td, struct io_u *io_u)
224{
225 return FIO_Q_COMPLETED;
226}
227
228FIO_STATIC struct ioengine_ops ioengine_server = {
229 .name = "librpma_apm_server",
230 .version = FIO_IOOPS_VERSION,
231 .init = librpma_fio_server_init,
232 .open_file = server_open_file,
233 .close_file = librpma_fio_server_close_file,
234 .queue = server_queue,
235 .invalidate = librpma_fio_file_nop,
236 .cleanup = librpma_fio_server_cleanup,
237 .flags = FIO_SYNCIO,
238 .options = librpma_fio_options,
239 .option_struct_size = sizeof(struct librpma_fio_options_values),
240};
241
242/* register both engines */
243
244static void fio_init fio_librpma_apm_register(void)
245{
246 register_ioengine(&ioengine_client);
247 register_ioengine(&ioengine_server);
248}
249
250static void fio_exit fio_librpma_apm_unregister(void)
251{
252 unregister_ioengine(&ioengine_client);
253 unregister_ioengine(&ioengine_server);
254}