io_uring: set async IO priority to td->ioprio in fio_ioring_prep()
[fio.git] / engines / librpma_apm.c
CommitLineData
e4c4625f
JM
1/*
2* librpma_apm: IO engine that uses PMDK librpma to read and write data,
3 * based on Appliance Persistency Method
4 *
5 * Copyright 2020-2021, Intel Corporation
6 *
7 * This program is free software; you can redistribute it and/or
8 * modify it under the terms of the GNU General Public License,
9 * version 2 as published by the Free Software Foundation..
10 *
11 * This program is distributed in the hope that it will be useful,
12 * but WITHOUT ANY WARRANTY; without even the implied warranty of
13 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14 * GNU General Public License for more details.
15 */
16
17#include "librpma_fio.h"
18
19/* client side implementation */
20
21static inline int client_io_flush(struct thread_data *td,
22 struct io_u *first_io_u, struct io_u *last_io_u,
23 unsigned long long int len);
24
25static int client_get_io_u_index(struct rpma_completion *cmpl,
26 unsigned int *io_u_index);
27
28static int client_init(struct thread_data *td)
29{
30 struct librpma_fio_client_data *ccd;
31 unsigned int sq_size;
32 uint32_t cq_size;
33 struct rpma_conn_cfg *cfg = NULL;
34 struct rpma_peer_cfg *pcfg = NULL;
35 int ret;
36
37 /* not supported readwrite = trim / randtrim / trimwrite */
38 if (td_trim(td)) {
39 td_verror(td, EINVAL, "Not supported mode.");
40 return -1;
41 }
42
43 /*
44 * Calculate the required queue sizes where:
45 * - the send queue (SQ) has to be big enough to accommodate
46 * all io_us (WRITEs) and all flush requests (FLUSHes)
47 * - the completion queue (CQ) has to be big enough to accommodate all
48 * success and error completions (cq_size = sq_size)
49 */
50 if (td_random(td) || td_rw(td)) {
51 /*
52 * sq_size = max(rand_read_sq_size, rand_write_sq_size)
53 * where rand_read_sq_size < rand_write_sq_size because read
54 * does not require flush afterwards
55 * rand_write_sq_size = N * (WRITE + FLUSH)
56 *
57 * Note: rw is no different from random write since having
58 * interleaved reads with writes in extreme forces you to flush
59 * as often as when the writes are random.
60 */
61 sq_size = 2 * td->o.iodepth;
62 } else if (td_write(td)) {
63 /* sequential TD_DDIR_WRITE only */
64 if (td->o.sync_io) {
65 sq_size = 2; /* WRITE + FLUSH */
66 } else {
67 /*
68 * N * WRITE + B * FLUSH where:
69 * - B == ceil(iodepth / iodepth_batch)
70 * which is the number of batches for N writes
71 */
72 sq_size = td->o.iodepth + LIBRPMA_FIO_CEIL(td->o.iodepth,
73 td->o.iodepth_batch);
74 }
75 } else {
76 /* TD_DDIR_READ only */
77 if (td->o.sync_io) {
78 sq_size = 1; /* READ */
79 } else {
80 sq_size = td->o.iodepth; /* N x READ */
81 }
82 }
83 cq_size = sq_size;
84
85 /* create a connection configuration object */
86 if ((ret = rpma_conn_cfg_new(&cfg))) {
87 librpma_td_verror(td, ret, "rpma_conn_cfg_new");
88 return -1;
89 }
90
91 /* apply queue sizes */
92 if ((ret = rpma_conn_cfg_set_sq_size(cfg, sq_size))) {
93 librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
94 goto err_cfg_delete;
95 }
96 if ((ret = rpma_conn_cfg_set_cq_size(cfg, cq_size))) {
97 librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
98 goto err_cfg_delete;
99 }
100
101 if (librpma_fio_client_init(td, cfg))
102 goto err_cfg_delete;
103
104 ccd = td->io_ops_data;
105
106 if (ccd->server_mr_flush_type == RPMA_FLUSH_TYPE_PERSISTENT) {
107 if (!ccd->ws->direct_write_to_pmem) {
108 if (td->thread_number == 1)
109 log_err(
110 "Fio librpma engine will not work until the Direct Write to PMem on the server side is possible (direct_write_to_pmem)\n");
111 goto err_cleanup_common;
112 }
113
114 /* configure peer's direct write to pmem support */
115 if ((ret = rpma_peer_cfg_new(&pcfg))) {
116 librpma_td_verror(td, ret, "rpma_peer_cfg_new");
117 goto err_cleanup_common;
118 }
119
120 if ((ret = rpma_peer_cfg_set_direct_write_to_pmem(pcfg, true))) {
121 librpma_td_verror(td, ret,
122 "rpma_peer_cfg_set_direct_write_to_pmem");
123 (void) rpma_peer_cfg_delete(&pcfg);
124 goto err_cleanup_common;
125 }
126
127 if ((ret = rpma_conn_apply_remote_peer_cfg(ccd->conn, pcfg))) {
128 librpma_td_verror(td, ret,
129 "rpma_conn_apply_remote_peer_cfg");
130 (void) rpma_peer_cfg_delete(&pcfg);
131 goto err_cleanup_common;
132 }
133
134 (void) rpma_peer_cfg_delete(&pcfg);
135 } else if (td->thread_number == 1) {
136 /* XXX log_info mixes with the JSON output */
137 log_err(
138 "Note: Direct Write to PMem is not supported by default nor required if you use DRAM instead of PMem on the server side (direct_write_to_pmem).\n"
139 "Remember that flushing to DRAM does not make your data persistent and may be used only for experimental purposes.\n");
140 }
141
142 if ((ret = rpma_conn_cfg_delete(&cfg))) {
143 librpma_td_verror(td, ret, "rpma_conn_cfg_delete");
144 /* non fatal error - continue */
145 }
146
147 ccd->flush = client_io_flush;
148 ccd->get_io_u_index = client_get_io_u_index;
149
150 return 0;
151
152err_cleanup_common:
153 librpma_fio_client_cleanup(td);
154
155err_cfg_delete:
156 (void) rpma_conn_cfg_delete(&cfg);
157
158 return -1;
159}
160
161static void client_cleanup(struct thread_data *td)
162{
163 struct librpma_fio_client_data *ccd = td->io_ops_data;
164
165 if (ccd == NULL)
166 return;
167
168 free(ccd->client_data);
169
170 librpma_fio_client_cleanup(td);
171}
172
173static inline int client_io_flush(struct thread_data *td,
174 struct io_u *first_io_u, struct io_u *last_io_u,
175 unsigned long long int len)
176{
177 struct librpma_fio_client_data *ccd = td->io_ops_data;
178 size_t dst_offset = first_io_u->offset;
179 int ret;
180
181 if ((ret = rpma_flush(ccd->conn, ccd->server_mr, dst_offset, len,
182 ccd->server_mr_flush_type, RPMA_F_COMPLETION_ALWAYS,
183 (void *)(uintptr_t)last_io_u->index))) {
184 librpma_td_verror(td, ret, "rpma_flush");
185 return -1;
186 }
187
188 return 0;
189}
190
191static int client_get_io_u_index(struct rpma_completion *cmpl,
192 unsigned int *io_u_index)
193{
194 memcpy(io_u_index, &cmpl->op_context, sizeof(*io_u_index));
195
196 return 1;
197}
198
199FIO_STATIC struct ioengine_ops ioengine_client = {
200 .name = "librpma_apm_client",
201 .version = FIO_IOOPS_VERSION,
202 .init = client_init,
203 .post_init = librpma_fio_client_post_init,
204 .get_file_size = librpma_fio_client_get_file_size,
205 .open_file = librpma_fio_file_nop,
206 .queue = librpma_fio_client_queue,
207 .commit = librpma_fio_client_commit,
208 .getevents = librpma_fio_client_getevents,
209 .event = librpma_fio_client_event,
210 .errdetails = librpma_fio_client_errdetails,
211 .close_file = librpma_fio_file_nop,
212 .cleanup = client_cleanup,
213 .flags = FIO_DISKLESSIO,
214 .options = librpma_fio_options,
215 .option_struct_size = sizeof(struct librpma_fio_options_values),
216};
217
218/* server side implementation */
219
220static int server_open_file(struct thread_data *td, struct fio_file *f)
221{
222 return librpma_fio_server_open_file(td, f, NULL);
223}
224
225static enum fio_q_status server_queue(struct thread_data *td, struct io_u *io_u)
226{
227 return FIO_Q_COMPLETED;
228}
229
230FIO_STATIC struct ioengine_ops ioengine_server = {
231 .name = "librpma_apm_server",
232 .version = FIO_IOOPS_VERSION,
233 .init = librpma_fio_server_init,
234 .open_file = server_open_file,
235 .close_file = librpma_fio_server_close_file,
236 .queue = server_queue,
237 .invalidate = librpma_fio_file_nop,
238 .cleanup = librpma_fio_server_cleanup,
239 .flags = FIO_SYNCIO,
240 .options = librpma_fio_options,
241 .option_struct_size = sizeof(struct librpma_fio_options_values),
242};
243
244/* register both engines */
245
246static void fio_init fio_librpma_apm_register(void)
247{
248 register_ioengine(&ioengine_client);
249 register_ioengine(&ioengine_server);
250}
251
252static void fio_exit fio_librpma_apm_unregister(void)
253{
254 unregister_ioengine(&ioengine_client);
255 unregister_ioengine(&ioengine_server);
256}