t/nvmept_trim: increase transfer size for some tests
[fio.git] / engines / librpma_apm.c
1 /*
2 * librpma_apm: IO engine that uses PMDK librpma to read and write data,
3  *              based on Appliance Persistency Method
4  *
5  * Copyright 2020-2021, Intel Corporation
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License,
9  * version 2 as published by the Free Software Foundation..
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  */
16
17 #include "librpma_fio.h"
18
19 /* client side implementation */
20
21 static inline int client_io_flush(struct thread_data *td,
22                 struct io_u *first_io_u, struct io_u *last_io_u,
23                 unsigned long long int len);
24
25 static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index);
26
27 static int client_init(struct thread_data *td)
28 {
29         struct librpma_fio_client_data *ccd;
30         unsigned int sq_size;
31         uint32_t cq_size;
32         struct rpma_conn_cfg *cfg = NULL;
33         struct rpma_peer_cfg *pcfg = NULL;
34         int ret;
35
36         /* not supported readwrite = trim / randtrim / trimwrite */
37         if (td_trim(td)) {
38                 td_verror(td, EINVAL, "Not supported mode.");
39                 return -1;
40         }
41
42         /*
43          * Calculate the required queue sizes where:
44          * - the send queue (SQ) has to be big enough to accommodate
45          *   all io_us (WRITEs) and all flush requests (FLUSHes)
46          * - the completion queue (CQ) has to be big enough to accommodate all
47          *   success and error completions (cq_size = sq_size)
48          */
49         if (td_random(td) || td_rw(td)) {
50                 /*
51                  * sq_size = max(rand_read_sq_size, rand_write_sq_size)
52                  * where rand_read_sq_size < rand_write_sq_size because read
53                  * does not require flush afterwards
54                  * rand_write_sq_size = N * (WRITE + FLUSH)
55                  *
56                  * Note: rw is no different from random write since having
57                  * interleaved reads with writes in extreme forces you to flush
58                  * as often as when the writes are random.
59                  */
60                 sq_size = 2 * td->o.iodepth;
61         } else if (td_write(td)) {
62                 /* sequential TD_DDIR_WRITE only */
63                 if (td->o.sync_io) {
64                         sq_size = 2; /* WRITE + FLUSH */
65                 } else {
66                         /*
67                          * N * WRITE + B * FLUSH where:
68                          * - B == ceil(iodepth / iodepth_batch)
69                          *   which is the number of batches for N writes
70                          */
71                         sq_size = td->o.iodepth + LIBRPMA_FIO_CEIL(td->o.iodepth,
72                                         td->o.iodepth_batch);
73                 }
74         } else {
75                 /* TD_DDIR_READ only */
76                 if (td->o.sync_io) {
77                         sq_size = 1; /* READ */
78                 } else {
79                         sq_size = td->o.iodepth; /* N x READ */
80                 }
81         }
82         cq_size = sq_size;
83
84         /* create a connection configuration object */
85         if ((ret = rpma_conn_cfg_new(&cfg))) {
86                 librpma_td_verror(td, ret, "rpma_conn_cfg_new");
87                 return -1;
88         }
89
90         /* apply queue sizes */
91         if ((ret = rpma_conn_cfg_set_sq_size(cfg, sq_size))) {
92                 librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
93                 goto err_cfg_delete;
94         }
95         if ((ret = rpma_conn_cfg_set_cq_size(cfg, cq_size))) {
96                 librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
97                 goto err_cfg_delete;
98         }
99
100         if (librpma_fio_client_init(td, cfg))
101                 goto err_cfg_delete;
102
103         ccd = td->io_ops_data;
104
105         if (ccd->server_mr_flush_type == RPMA_FLUSH_TYPE_PERSISTENT) {
106                 if (!ccd->ws->direct_write_to_pmem) {
107                         if (td->thread_number == 1)
108                                 log_err(
109                                         "Fio librpma engine will not work until the Direct Write to PMem on the server side is possible (direct_write_to_pmem)\n");
110                         goto err_cleanup_common;
111                 }
112
113                 /* configure peer's direct write to pmem support */
114                 if ((ret = rpma_peer_cfg_new(&pcfg))) {
115                         librpma_td_verror(td, ret, "rpma_peer_cfg_new");
116                         goto err_cleanup_common;
117                 }
118
119                 if ((ret = rpma_peer_cfg_set_direct_write_to_pmem(pcfg, true))) {
120                         librpma_td_verror(td, ret,
121                                 "rpma_peer_cfg_set_direct_write_to_pmem");
122                         (void) rpma_peer_cfg_delete(&pcfg);
123                         goto err_cleanup_common;
124                 }
125
126                 if ((ret = rpma_conn_apply_remote_peer_cfg(ccd->conn, pcfg))) {
127                         librpma_td_verror(td, ret,
128                                 "rpma_conn_apply_remote_peer_cfg");
129                         (void) rpma_peer_cfg_delete(&pcfg);
130                         goto err_cleanup_common;
131                 }
132
133                 (void) rpma_peer_cfg_delete(&pcfg);
134         } else if (td->thread_number == 1) {
135                 /* XXX log_info mixes with the JSON output */
136                 log_err(
137                         "Note: Direct Write to PMem is not supported by default nor required if you use DRAM instead of PMem on the server side (direct_write_to_pmem).\n"
138                         "Remember that flushing to DRAM does not make your data persistent and may be used only for experimental purposes.\n");
139         }
140
141         if ((ret = rpma_conn_cfg_delete(&cfg))) {
142                 librpma_td_verror(td, ret, "rpma_conn_cfg_delete");
143                 /* non fatal error - continue */
144         }
145
146         ccd->flush = client_io_flush;
147         ccd->get_io_u_index = client_get_io_u_index;
148
149         return 0;
150
151 err_cleanup_common:
152         librpma_fio_client_cleanup(td);
153
154 err_cfg_delete:
155         (void) rpma_conn_cfg_delete(&cfg);
156
157         return -1;
158 }
159
160 static void client_cleanup(struct thread_data *td)
161 {
162         struct librpma_fio_client_data *ccd = td->io_ops_data;
163
164         if (ccd == NULL)
165                 return;
166
167         free(ccd->client_data);
168
169         librpma_fio_client_cleanup(td);
170 }
171
172 static inline int client_io_flush(struct thread_data *td,
173                 struct io_u *first_io_u, struct io_u *last_io_u,
174                 unsigned long long int len)
175 {
176         struct librpma_fio_client_data *ccd = td->io_ops_data;
177         size_t dst_offset = first_io_u->offset;
178         int ret;
179
180         if ((ret = rpma_flush(ccd->conn, ccd->server_mr, dst_offset, len,
181                         ccd->server_mr_flush_type, RPMA_F_COMPLETION_ALWAYS,
182                         (void *)(uintptr_t)last_io_u->index))) {
183                 librpma_td_verror(td, ret, "rpma_flush");
184                 return -1;
185         }
186
187         return 0;
188 }
189
190 static int client_get_io_u_index(struct ibv_wc *wc, unsigned int *io_u_index)
191 {
192         memcpy(io_u_index, &wc->wr_id, sizeof(*io_u_index));
193
194         return 1;
195 }
196
197 FIO_STATIC struct ioengine_ops ioengine_client = {
198         .name                   = "librpma_apm_client",
199         .version                = FIO_IOOPS_VERSION,
200         .init                   = client_init,
201         .post_init              = librpma_fio_client_post_init,
202         .get_file_size          = librpma_fio_client_get_file_size,
203         .open_file              = librpma_fio_file_nop,
204         .queue                  = librpma_fio_client_queue,
205         .commit                 = librpma_fio_client_commit,
206         .getevents              = librpma_fio_client_getevents,
207         .event                  = librpma_fio_client_event,
208         .errdetails             = librpma_fio_client_errdetails,
209         .close_file             = librpma_fio_file_nop,
210         .cleanup                = client_cleanup,
211         .flags                  = FIO_DISKLESSIO | FIO_ASYNCIO_SETS_ISSUE_TIME,
212         .options                = librpma_fio_options,
213         .option_struct_size     = sizeof(struct librpma_fio_options_values),
214 };
215
216 /* server side implementation */
217
218 static int server_open_file(struct thread_data *td, struct fio_file *f)
219 {
220         return librpma_fio_server_open_file(td, f, NULL);
221 }
222
223 static enum fio_q_status server_queue(struct thread_data *td, struct io_u *io_u)
224 {
225         return FIO_Q_COMPLETED;
226 }
227
228 FIO_STATIC struct ioengine_ops ioengine_server = {
229         .name                   = "librpma_apm_server",
230         .version                = FIO_IOOPS_VERSION,
231         .init                   = librpma_fio_server_init,
232         .open_file              = server_open_file,
233         .close_file             = librpma_fio_server_close_file,
234         .queue                  = server_queue,
235         .invalidate             = librpma_fio_file_nop,
236         .cleanup                = librpma_fio_server_cleanup,
237         .flags                  = FIO_SYNCIO,
238         .options                = librpma_fio_options,
239         .option_struct_size     = sizeof(struct librpma_fio_options_values),
240 };
241
242 /* register both engines */
243
244 static void fio_init fio_librpma_apm_register(void)
245 {
246         register_ioengine(&ioengine_client);
247         register_ioengine(&ioengine_server);
248 }
249
250 static void fio_exit fio_librpma_apm_unregister(void)
251 {
252         unregister_ioengine(&ioengine_client);
253         unregister_ioengine(&ioengine_server);
254 }