ffa3769d337a1cb571b65dfa5ded416c98ae15b6
[fio.git] / engines / librpma_apm.c
1 /*
2 * librpma_apm: IO engine that uses PMDK librpma to read and write data,
3  *              based on Appliance Persistency Method
4  *
5  * Copyright 2020-2021, Intel Corporation
6  *
7  * This program is free software; you can redistribute it and/or
8  * modify it under the terms of the GNU General Public License,
9  * version 2 as published by the Free Software Foundation..
10  *
11  * This program is distributed in the hope that it will be useful,
12  * but WITHOUT ANY WARRANTY; without even the implied warranty of
13  * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
14  * GNU General Public License for more details.
15  */
16
17 #include "librpma_fio.h"
18
19 /* client side implementation */
20
21 static inline int client_io_flush(struct thread_data *td,
22                 struct io_u *first_io_u, struct io_u *last_io_u,
23                 unsigned long long int len);
24
25 static int client_get_io_u_index(struct rpma_completion *cmpl,
26                 unsigned int *io_u_index);
27
28 static int client_init(struct thread_data *td)
29 {
30         struct librpma_fio_client_data *ccd;
31         unsigned int sq_size;
32         uint32_t cq_size;
33         struct rpma_conn_cfg *cfg = NULL;
34         struct rpma_peer_cfg *pcfg = NULL;
35         int ret;
36
37         /* not supported readwrite = trim / randtrim / trimwrite */
38         if (td_trim(td)) {
39                 td_verror(td, EINVAL, "Not supported mode.");
40                 return -1;
41         }
42
43         /*
44          * Calculate the required queue sizes where:
45          * - the send queue (SQ) has to be big enough to accommodate
46          *   all io_us (WRITEs) and all flush requests (FLUSHes)
47          * - the completion queue (CQ) has to be big enough to accommodate all
48          *   success and error completions (cq_size = sq_size)
49          */
50         if (td_random(td) || td_rw(td)) {
51                 /*
52                  * sq_size = max(rand_read_sq_size, rand_write_sq_size)
53                  * where rand_read_sq_size < rand_write_sq_size because read
54                  * does not require flush afterwards
55                  * rand_write_sq_size = N * (WRITE + FLUSH)
56                  *
57                  * Note: rw is no different from random write since having
58                  * interleaved reads with writes in extreme forces you to flush
59                  * as often as when the writes are random.
60                  */
61                 sq_size = 2 * td->o.iodepth;
62         } else if (td_write(td)) {
63                 /* sequential TD_DDIR_WRITE only */
64                 if (td->o.sync_io) {
65                         sq_size = 2; /* WRITE + FLUSH */
66                 } else {
67                         /*
68                          * N * WRITE + B * FLUSH where:
69                          * - B == ceil(iodepth / iodepth_batch)
70                          *   which is the number of batches for N writes
71                          */
72                         sq_size = td->o.iodepth + LIBRPMA_FIO_CEIL(td->o.iodepth,
73                                         td->o.iodepth_batch);
74                 }
75         } else {
76                 /* TD_DDIR_READ only */
77                 if (td->o.sync_io) {
78                         sq_size = 1; /* READ */
79                 } else {
80                         sq_size = td->o.iodepth; /* N x READ */
81                 }
82         }
83         cq_size = sq_size;
84
85         /* create a connection configuration object */
86         if ((ret = rpma_conn_cfg_new(&cfg))) {
87                 librpma_td_verror(td, ret, "rpma_conn_cfg_new");
88                 return -1;
89         }
90
91         /* apply queue sizes */
92         if ((ret = rpma_conn_cfg_set_sq_size(cfg, sq_size))) {
93                 librpma_td_verror(td, ret, "rpma_conn_cfg_set_sq_size");
94                 goto err_cfg_delete;
95         }
96         if ((ret = rpma_conn_cfg_set_cq_size(cfg, cq_size))) {
97                 librpma_td_verror(td, ret, "rpma_conn_cfg_set_cq_size");
98                 goto err_cfg_delete;
99         }
100
101         if (librpma_fio_client_init(td, cfg))
102                 goto err_cfg_delete;
103
104         ccd = td->io_ops_data;
105
106         if (ccd->server_mr_flush_type == RPMA_FLUSH_TYPE_PERSISTENT) {
107                 if (!ccd->ws->direct_write_to_pmem) {
108                         if (td->thread_number == 1)
109                                 log_err(
110                                         "Fio librpma engine will not work until the Direct Write to PMem on the server side is possible (direct_write_to_pmem)\n");
111                         goto err_cleanup_common;
112                 }
113
114                 /* configure peer's direct write to pmem support */
115                 if ((ret = rpma_peer_cfg_new(&pcfg))) {
116                         librpma_td_verror(td, ret, "rpma_peer_cfg_new");
117                         goto err_cleanup_common;
118                 }
119
120                 if ((ret = rpma_peer_cfg_set_direct_write_to_pmem(pcfg, true))) {
121                         librpma_td_verror(td, ret,
122                                 "rpma_peer_cfg_set_direct_write_to_pmem");
123                         (void) rpma_peer_cfg_delete(&pcfg);
124                         goto err_cleanup_common;
125                 }
126
127                 if ((ret = rpma_conn_apply_remote_peer_cfg(ccd->conn, pcfg))) {
128                         librpma_td_verror(td, ret,
129                                 "rpma_conn_apply_remote_peer_cfg");
130                         (void) rpma_peer_cfg_delete(&pcfg);
131                         goto err_cleanup_common;
132                 }
133
134                 (void) rpma_peer_cfg_delete(&pcfg);
135         } else if (td->thread_number == 1) {
136                 /* XXX log_info mixes with the JSON output */
137                 log_err(
138                         "Note: Direct Write to PMem is not supported by default nor required if you use DRAM instead of PMem on the server side (direct_write_to_pmem).\n"
139                         "Remember that flushing to DRAM does not make your data persistent and may be used only for experimental purposes.\n");
140         }
141
142         if ((ret = rpma_conn_cfg_delete(&cfg))) {
143                 librpma_td_verror(td, ret, "rpma_conn_cfg_delete");
144                 /* non fatal error - continue */
145         }
146
147         ccd->flush = client_io_flush;
148         ccd->get_io_u_index = client_get_io_u_index;
149
150         return 0;
151
152 err_cleanup_common:
153         librpma_fio_client_cleanup(td);
154
155 err_cfg_delete:
156         (void) rpma_conn_cfg_delete(&cfg);
157
158         return -1;
159 }
160
161 static void client_cleanup(struct thread_data *td)
162 {
163         struct librpma_fio_client_data *ccd = td->io_ops_data;
164
165         if (ccd == NULL)
166                 return;
167
168         free(ccd->client_data);
169
170         librpma_fio_client_cleanup(td);
171 }
172
173 static inline int client_io_flush(struct thread_data *td,
174                 struct io_u *first_io_u, struct io_u *last_io_u,
175                 unsigned long long int len)
176 {
177         struct librpma_fio_client_data *ccd = td->io_ops_data;
178         size_t dst_offset = first_io_u->offset;
179         int ret;
180
181         if ((ret = rpma_flush(ccd->conn, ccd->server_mr, dst_offset, len,
182                         ccd->server_mr_flush_type, RPMA_F_COMPLETION_ALWAYS,
183                         (void *)(uintptr_t)last_io_u->index))) {
184                 librpma_td_verror(td, ret, "rpma_flush");
185                 return -1;
186         }
187
188         return 0;
189 }
190
191 static int client_get_io_u_index(struct rpma_completion *cmpl,
192                 unsigned int *io_u_index)
193 {
194         memcpy(io_u_index, &cmpl->op_context, sizeof(*io_u_index));
195
196         return 1;
197 }
198
199 FIO_STATIC struct ioengine_ops ioengine_client = {
200         .name                   = "librpma_apm_client",
201         .version                = FIO_IOOPS_VERSION,
202         .init                   = client_init,
203         .post_init              = librpma_fio_client_post_init,
204         .get_file_size          = librpma_fio_client_get_file_size,
205         .open_file              = librpma_fio_file_nop,
206         .queue                  = librpma_fio_client_queue,
207         .commit                 = librpma_fio_client_commit,
208         .getevents              = librpma_fio_client_getevents,
209         .event                  = librpma_fio_client_event,
210         .errdetails             = librpma_fio_client_errdetails,
211         .close_file             = librpma_fio_file_nop,
212         .cleanup                = client_cleanup,
213         .flags                  = FIO_DISKLESSIO,
214         .options                = librpma_fio_options,
215         .option_struct_size     = sizeof(struct librpma_fio_options_values),
216 };
217
218 /* server side implementation */
219
220 static int server_open_file(struct thread_data *td, struct fio_file *f)
221 {
222         return librpma_fio_server_open_file(td, f, NULL);
223 }
224
225 static enum fio_q_status server_queue(struct thread_data *td, struct io_u *io_u)
226 {
227         return FIO_Q_COMPLETED;
228 }
229
230 FIO_STATIC struct ioengine_ops ioengine_server = {
231         .name                   = "librpma_apm_server",
232         .version                = FIO_IOOPS_VERSION,
233         .init                   = librpma_fio_server_init,
234         .open_file              = server_open_file,
235         .close_file             = librpma_fio_server_close_file,
236         .queue                  = server_queue,
237         .invalidate             = librpma_fio_file_nop,
238         .cleanup                = librpma_fio_server_cleanup,
239         .flags                  = FIO_SYNCIO,
240         .options                = librpma_fio_options,
241         .option_struct_size     = sizeof(struct librpma_fio_options_values),
242 };
243
244 /* register both engines */
245
246 static void fio_init fio_librpma_apm_register(void)
247 {
248         register_ioengine(&ioengine_client);
249         register_ioengine(&ioengine_server);
250 }
251
252 static void fio_exit fio_librpma_apm_unregister(void)
253 {
254         unregister_ioengine(&ioengine_client);
255         unregister_ioengine(&ioengine_server);
256 }