rdma: use private random state
[fio.git] / engines / rdma.c
1 /*
2  * RDMA I/O engine
3  *
4  * RDMA I/O engine based on the IB verbs and RDMA/CM user space libraries.
5  * Supports both RDMA memory semantics and channel semantics
6  *   for the InfiniBand, RoCE and iWARP protocols.
7  *
8  * This I/O engine is disabled by default. To enable it, execute:
9  *
10  * $ export EXTFLAGS+=" -DFIO_HAVE_RDMA "
11  * $ export EXTLIBS+=" -libverbs -lrdmacm "
12  *
13  * before running make. You will need the Linux RDMA software as well, either
14  * from your Linux distributor or directly from openfabrics.org:
15  *
16  * http://www.openfabrics.org/downloads/OFED/
17  *
18  * Exchanging steps of RDMA ioengine control messages:
19  *      1. client side sends test mode (RDMA_WRITE/RDMA_READ/SEND)
20  *         to server side.
21  *      2. server side parses test mode, and sends back confirmation
22  *         to client side. In RDMA WRITE/READ test, this confirmation
23  *         includes memory information, such as rkey, address.
24  *      3. client side initiates test loop.
25  *      4. In RDMA WRITE/READ test, client side sends a completion
26  *         notification to server side. Server side updates its
27  *         td->done as true.
28  *
29  */
30 #include <stdio.h>
31 #include <stdlib.h>
32 #include <unistd.h>
33 #include <errno.h>
34 #include <assert.h>
35 #include <netinet/in.h>
36 #include <arpa/inet.h>
37 #include <netdb.h>
38 #include <sys/poll.h>
39 #include <sys/types.h>
40 #include <sys/socket.h>
41 #include <sys/time.h>
42 #include <sys/resource.h>
43
44 #include <byteswap.h>
45 #include <pthread.h>
46 #include <inttypes.h>
47
48 #include "../fio.h"
49
50 #ifdef FIO_HAVE_RDMA
51
52 #include <rdma/rdma_cma.h>
53 #include <infiniband/arch.h>
54
55 #define FIO_RDMA_MAX_IO_DEPTH    512
56
57 enum rdma_io_mode {
58         FIO_RDMA_UNKNOWN = 0,
59         FIO_RDMA_MEM_WRITE,
60         FIO_RDMA_MEM_READ,
61         FIO_RDMA_CHA_SEND,
62         FIO_RDMA_CHA_RECV
63 };
64
65 struct remote_u {
66         uint64_t buf;
67         uint32_t rkey;
68         uint32_t size;
69 };
70
71 struct rdma_info_blk {
72         uint32_t mode;          /* channel semantic or memory semantic */
73         uint32_t nr;            /* client: io depth
74                                    server: number of records for memory semantic
75                                  */
76         struct remote_u rmt_us[FIO_RDMA_MAX_IO_DEPTH];
77 };
78
79 struct rdma_io_u_data {
80         uint64_t wr_id;
81         struct ibv_send_wr sq_wr;
82         struct ibv_recv_wr rq_wr;
83         struct ibv_sge rdma_sgl;
84 };
85
86 struct rdmaio_data {
87         int is_client;
88         enum rdma_io_mode rdma_protocol;
89         char host[64];
90         struct sockaddr_in addr;
91
92         struct ibv_recv_wr rq_wr;
93         struct ibv_sge recv_sgl;
94         struct rdma_info_blk recv_buf;
95         struct ibv_mr *recv_mr;
96
97         struct ibv_send_wr sq_wr;
98         struct ibv_sge send_sgl;
99         struct rdma_info_blk send_buf;
100         struct ibv_mr *send_mr;
101
102         struct ibv_comp_channel *channel;
103         struct ibv_cq *cq;
104         struct ibv_pd *pd;
105         struct ibv_qp *qp;
106
107         pthread_t cmthread;
108         struct rdma_event_channel *cm_channel;
109         struct rdma_cm_id *cm_id;
110         struct rdma_cm_id *child_cm_id;
111
112         int cq_event_num;
113
114         struct remote_u *rmt_us;
115         int rmt_nr;
116         struct io_u **io_us_queued;
117         int io_u_queued_nr;
118         struct io_u **io_us_flight;
119         int io_u_flight_nr;
120         struct io_u **io_us_completed;
121         int io_u_completed_nr;
122
123         struct frand_state rand_state;
124 };
125
126 static int client_recv(struct thread_data *td, struct ibv_wc *wc)
127 {
128         struct rdmaio_data *rd = td->io_ops->data;
129
130         if (wc->byte_len != sizeof(rd->recv_buf)) {
131                 log_err("Received bogus data, size %d\n", wc->byte_len);
132                 return 1;
133         }
134
135         /* store mr info for MEMORY semantic */
136         if ((rd->rdma_protocol == FIO_RDMA_MEM_WRITE) ||
137             (rd->rdma_protocol == FIO_RDMA_MEM_READ)) {
138                 /* struct flist_head *entry; */
139                 int i = 0;
140
141                 rd->rmt_nr = ntohl(rd->recv_buf.nr);
142
143                 for (i = 0; i < rd->rmt_nr; i++) {
144                         rd->rmt_us[i].buf = ntohll(rd->recv_buf.rmt_us[i].buf);
145                         rd->rmt_us[i].rkey = ntohl(rd->recv_buf.rmt_us[i].rkey);
146                         rd->rmt_us[i].size = ntohl(rd->recv_buf.rmt_us[i].size);
147
148                         dprint(FD_IO,
149                                "fio: Received rkey %x addr %" PRIx64
150                                " len %d from peer\n", rd->rmt_us[i].rkey,
151                                rd->rmt_us[i].buf, rd->rmt_us[i].size);
152                 }
153         }
154
155         return 0;
156 }
157
158 static int server_recv(struct thread_data *td, struct ibv_wc *wc)
159 {
160         struct rdmaio_data *rd = td->io_ops->data;
161
162         if (wc->wr_id == FIO_RDMA_MAX_IO_DEPTH) {
163                 rd->rdma_protocol = ntohl(rd->recv_buf.mode);
164
165                 /* CHANNEL semantic, do nothing */
166                 if (rd->rdma_protocol == FIO_RDMA_CHA_SEND)
167                         rd->rdma_protocol = FIO_RDMA_CHA_RECV;
168         }
169
170         return 0;
171 }
172
173 static int cq_event_handler(struct thread_data *td, enum ibv_wc_opcode opcode)
174 {
175         struct rdmaio_data *rd = td->io_ops->data;
176         struct ibv_wc wc;
177         struct rdma_io_u_data *r_io_u_d;
178         int ret;
179         int compevnum = 0;
180         int i;
181
182         while ((ret = ibv_poll_cq(rd->cq, 1, &wc)) == 1) {
183                 ret = 0;
184                 compevnum++;
185
186                 if (wc.status) {
187                         log_err("fio: cq completion status %d(%s)\n",
188                                 wc.status, ibv_wc_status_str(wc.status));
189                         return -1;
190                 }
191
192                 switch (wc.opcode) {
193
194                 case IBV_WC_RECV:
195                         if (rd->is_client == 1)
196                                 client_recv(td, &wc);
197                         else
198                                 server_recv(td, &wc);
199
200                         if (wc.wr_id == FIO_RDMA_MAX_IO_DEPTH)
201                                 break;
202
203                         for (i = 0; i < rd->io_u_flight_nr; i++) {
204                                 r_io_u_d = rd->io_us_flight[i]->engine_data;
205
206                                 if (wc.wr_id == r_io_u_d->rq_wr.wr_id) {
207                                         rd->io_us_flight[i]->resid =
208                                             rd->io_us_flight[i]->buflen
209                                             - wc.byte_len;
210
211                                         rd->io_us_flight[i]->error = 0;
212
213                                         rd->io_us_completed[rd->
214                                                             io_u_completed_nr]
215                                             = rd->io_us_flight[i];
216                                         rd->io_u_completed_nr++;
217                                         break;
218                                 }
219                         }
220                         if (i == rd->io_u_flight_nr)
221                                 log_err("fio: recv wr %" PRId64 " not found\n",
222                                         wc.wr_id);
223                         else {
224                                 /* put the last one into middle of the list */
225                                 rd->io_us_flight[i] =
226                                     rd->io_us_flight[rd->io_u_flight_nr - 1];
227                                 rd->io_u_flight_nr--;
228                         }
229
230                         break;
231
232                 case IBV_WC_SEND:
233                 case IBV_WC_RDMA_WRITE:
234                 case IBV_WC_RDMA_READ:
235                         if (wc.wr_id == FIO_RDMA_MAX_IO_DEPTH)
236                                 break;
237
238                         for (i = 0; i < rd->io_u_flight_nr; i++) {
239                                 r_io_u_d = rd->io_us_flight[i]->engine_data;
240
241                                 if (wc.wr_id == r_io_u_d->sq_wr.wr_id) {
242                                         rd->io_us_completed[rd->
243                                                             io_u_completed_nr]
244                                             = rd->io_us_flight[i];
245                                         rd->io_u_completed_nr++;
246                                         break;
247                                 }
248                         }
249                         if (i == rd->io_u_flight_nr)
250                                 log_err("fio: send wr %" PRId64 " not found\n",
251                                         wc.wr_id);
252                         else {
253                                 /* put the last one into middle of the list */
254                                 rd->io_us_flight[i] =
255                                     rd->io_us_flight[rd->io_u_flight_nr - 1];
256                                 rd->io_u_flight_nr--;
257                         }
258
259                         break;
260
261                 default:
262                         log_info("fio: unknown completion event %d\n",
263                                  wc.opcode);
264                         return -1;
265                 }
266                 rd->cq_event_num++;
267         }
268         if (ret) {
269                 log_err("fio: poll error %d\n", ret);
270                 return 1;
271         }
272
273         return compevnum;
274 }
275
276 /*
277  * Return -1 for error and 'nr events' for a positive number
278  * of events
279  */
280 static int rdma_poll_wait(struct thread_data *td, enum ibv_wc_opcode opcode)
281 {
282         struct rdmaio_data *rd = td->io_ops->data;
283         struct ibv_cq *ev_cq;
284         void *ev_ctx;
285         int ret;
286
287         if (rd->cq_event_num > 0) {     /* previous left */
288                 rd->cq_event_num--;
289                 return 0;
290         }
291
292 again:
293         if (ibv_get_cq_event(rd->channel, &ev_cq, &ev_ctx) != 0) {
294                 log_err("fio: Failed to get cq event!\n");
295                 return -1;
296         }
297         if (ev_cq != rd->cq) {
298                 log_err("fio: Unknown CQ!\n");
299                 return -1;
300         }
301         if (ibv_req_notify_cq(rd->cq, 0) != 0) {
302                 log_err("fio: Failed to set notify!\n");
303                 return -1;
304         }
305
306         ret = cq_event_handler(td, opcode);
307         if (ret < 1)
308                 goto again;
309
310         ibv_ack_cq_events(rd->cq, ret);
311
312         rd->cq_event_num--;
313
314         return ret;
315 }
316
317 static int fio_rdmaio_setup_qp(struct thread_data *td)
318 {
319         struct rdmaio_data *rd = td->io_ops->data;
320         struct ibv_qp_init_attr init_attr;
321         int qp_depth = td->o.iodepth * 2;       /* 2 times of io depth */
322
323         if (rd->is_client == 0)
324                 rd->pd = ibv_alloc_pd(rd->child_cm_id->verbs);
325         else
326                 rd->pd = ibv_alloc_pd(rd->cm_id->verbs);
327         if (rd->pd == NULL) {
328                 log_err("fio: ibv_alloc_pd fail\n");
329                 return 1;
330         }
331
332         if (rd->is_client == 0)
333                 rd->channel = ibv_create_comp_channel(rd->child_cm_id->verbs);
334         else
335                 rd->channel = ibv_create_comp_channel(rd->cm_id->verbs);
336         if (rd->channel == NULL) {
337                 log_err("fio: ibv_create_comp_channel fail\n");
338                 goto err1;
339         }
340
341         if (qp_depth < 16)
342                 qp_depth = 16;
343
344         if (rd->is_client == 0)
345                 rd->cq = ibv_create_cq(rd->child_cm_id->verbs,
346                                        qp_depth, rd, rd->channel, 0);
347         else
348                 rd->cq = ibv_create_cq(rd->cm_id->verbs,
349                                        qp_depth, rd, rd->channel, 0);
350         if (rd->cq == NULL) {
351                 log_err("fio: ibv_create_cq failed\n");
352                 goto err2;
353         }
354
355         if (ibv_req_notify_cq(rd->cq, 0) != 0) {
356                 log_err("fio: ibv_create_cq failed\n");
357                 goto err3;
358         }
359
360         /* create queue pair */
361         memset(&init_attr, 0, sizeof(init_attr));
362         init_attr.cap.max_send_wr = qp_depth;
363         init_attr.cap.max_recv_wr = qp_depth;
364         init_attr.cap.max_recv_sge = 1;
365         init_attr.cap.max_send_sge = 1;
366         init_attr.qp_type = IBV_QPT_RC;
367         init_attr.send_cq = rd->cq;
368         init_attr.recv_cq = rd->cq;
369
370         if (rd->is_client == 0) {
371                 if (rdma_create_qp(rd->child_cm_id, rd->pd, &init_attr) != 0) {
372                         log_err("fio: rdma_create_qp failed\n");
373                         goto err3;
374                 }
375                 rd->qp = rd->child_cm_id->qp;
376         } else {
377                 if (rdma_create_qp(rd->cm_id, rd->pd, &init_attr) != 0) {
378                         log_err("fio: rdma_create_qp failed\n");
379                         goto err3;
380                 }
381                 rd->qp = rd->cm_id->qp;
382         }
383
384         return 0;
385
386 err3:
387         ibv_destroy_cq(rd->cq);
388 err2:
389         ibv_destroy_comp_channel(rd->channel);
390 err1:
391         ibv_dealloc_pd(rd->pd);
392
393         return 1;
394 }
395
396 static int fio_rdmaio_setup_control_msg_buffers(struct thread_data *td)
397 {
398         struct rdmaio_data *rd = td->io_ops->data;
399
400         rd->recv_mr = ibv_reg_mr(rd->pd, &rd->recv_buf, sizeof(rd->recv_buf),
401                                  IBV_ACCESS_LOCAL_WRITE);
402         if (rd->recv_mr == NULL) {
403                 log_err("fio: recv_buf reg_mr failed\n");
404                 return 1;
405         }
406
407         rd->send_mr = ibv_reg_mr(rd->pd, &rd->send_buf, sizeof(rd->send_buf),
408                                  0);
409         if (rd->send_mr == NULL) {
410                 log_err("fio: send_buf reg_mr failed\n");
411                 ibv_dereg_mr(rd->recv_mr);
412                 return 1;
413         }
414
415         /* setup work request */
416         /* recv wq */
417         rd->recv_sgl.addr = (uint64_t) (unsigned long)&rd->recv_buf;
418         rd->recv_sgl.length = sizeof rd->recv_buf;
419         rd->recv_sgl.lkey = rd->recv_mr->lkey;
420         rd->rq_wr.sg_list = &rd->recv_sgl;
421         rd->rq_wr.num_sge = 1;
422         rd->rq_wr.wr_id = FIO_RDMA_MAX_IO_DEPTH;
423
424         /* send wq */
425         rd->send_sgl.addr = (uint64_t) (unsigned long)&rd->send_buf;
426         rd->send_sgl.length = sizeof rd->send_buf;
427         rd->send_sgl.lkey = rd->send_mr->lkey;
428
429         rd->sq_wr.opcode = IBV_WR_SEND;
430         rd->sq_wr.send_flags = IBV_SEND_SIGNALED;
431         rd->sq_wr.sg_list = &rd->send_sgl;
432         rd->sq_wr.num_sge = 1;
433         rd->sq_wr.wr_id = FIO_RDMA_MAX_IO_DEPTH;
434
435         return 0;
436 }
437
438 static int get_next_channel_event(struct thread_data *td,
439                                   struct rdma_event_channel *channel,
440                                   enum rdma_cm_event_type wait_event)
441 {
442         struct rdmaio_data *rd = td->io_ops->data;
443
444         int ret;
445         struct rdma_cm_event *event;
446
447         ret = rdma_get_cm_event(channel, &event);
448         if (ret) {
449                 log_err("fio: rdma_get_cm_event");
450                 return 1;
451         }
452
453         if (event->event != wait_event) {
454                 log_err("fio: event is %s instead of %s\n",
455                         rdma_event_str(event->event),
456                         rdma_event_str(wait_event));
457                 return 1;
458         }
459
460         switch (event->event) {
461         case RDMA_CM_EVENT_CONNECT_REQUEST:
462                 rd->child_cm_id = event->id;
463                 break;
464         default:
465                 break;
466         }
467
468         rdma_ack_cm_event(event);
469
470         return 0;
471 }
472
473 static int fio_rdmaio_prep(struct thread_data *td, struct io_u *io_u)
474 {
475         struct rdmaio_data *rd = td->io_ops->data;
476         struct rdma_io_u_data *r_io_u_d;
477
478         r_io_u_d = io_u->engine_data;
479
480         switch (rd->rdma_protocol) {
481         case FIO_RDMA_MEM_WRITE:
482         case FIO_RDMA_MEM_READ:
483                 r_io_u_d->rdma_sgl.addr = (uint64_t) (unsigned long)io_u->buf;
484                 r_io_u_d->rdma_sgl.lkey = io_u->mr->lkey;
485                 r_io_u_d->sq_wr.wr_id = r_io_u_d->wr_id;
486                 r_io_u_d->sq_wr.send_flags = IBV_SEND_SIGNALED;
487                 r_io_u_d->sq_wr.sg_list = &r_io_u_d->rdma_sgl;
488                 r_io_u_d->sq_wr.num_sge = 1;
489                 break;
490         case FIO_RDMA_CHA_SEND:
491                 r_io_u_d->rdma_sgl.addr = (uint64_t) (unsigned long)io_u->buf;
492                 r_io_u_d->rdma_sgl.lkey = io_u->mr->lkey;
493                 r_io_u_d->rdma_sgl.length = io_u->buflen;
494                 r_io_u_d->sq_wr.wr_id = r_io_u_d->wr_id;
495                 r_io_u_d->sq_wr.opcode = IBV_WR_SEND;
496                 r_io_u_d->sq_wr.send_flags = IBV_SEND_SIGNALED;
497                 r_io_u_d->sq_wr.sg_list = &r_io_u_d->rdma_sgl;
498                 r_io_u_d->sq_wr.num_sge = 1;
499                 break;
500         case FIO_RDMA_CHA_RECV:
501                 r_io_u_d->rdma_sgl.addr = (uint64_t) (unsigned long)io_u->buf;
502                 r_io_u_d->rdma_sgl.lkey = io_u->mr->lkey;
503                 r_io_u_d->rdma_sgl.length = io_u->buflen;
504                 r_io_u_d->rq_wr.wr_id = r_io_u_d->wr_id;
505                 r_io_u_d->rq_wr.sg_list = &r_io_u_d->rdma_sgl;
506                 r_io_u_d->rq_wr.num_sge = 1;
507                 break;
508         default:
509                 log_err("fio: unknown rdma protocol - %d\n", rd->rdma_protocol);
510                 break;
511         }
512
513         return 0;
514 }
515
516 static struct io_u *fio_rdmaio_event(struct thread_data *td, int event)
517 {
518         struct rdmaio_data *rd = td->io_ops->data;
519         struct io_u *io_u;
520         int i;
521
522         io_u = rd->io_us_completed[0];
523         for (i = 0; i < rd->io_u_completed_nr - 1; i++) {
524                 rd->io_us_completed[i] = rd->io_us_completed[i + 1];
525         }
526         rd->io_u_completed_nr--;
527
528         dprint_io_u(io_u, "fio_rdmaio_event");
529
530         return io_u;
531 }
532
533 static int fio_rdmaio_getevents(struct thread_data *td, unsigned int min,
534                                 unsigned int max, struct timespec *t)
535 {
536         struct rdmaio_data *rd = td->io_ops->data;
537         int r;
538         enum ibv_wc_opcode comp_opcode;
539         comp_opcode = IBV_WC_RDMA_WRITE;
540         struct ibv_cq *ev_cq;
541         void *ev_ctx;
542         int ret;
543
544         r = 0;
545
546         switch (rd->rdma_protocol) {
547         case FIO_RDMA_MEM_WRITE:
548                 comp_opcode = IBV_WC_RDMA_WRITE;
549                 break;
550         case FIO_RDMA_MEM_READ:
551                 comp_opcode = IBV_WC_RDMA_READ;
552                 break;
553         case FIO_RDMA_CHA_SEND:
554                 comp_opcode = IBV_WC_SEND;
555                 break;
556         case FIO_RDMA_CHA_RECV:
557                 comp_opcode = IBV_WC_RECV;
558                 break;
559         default:
560                 log_err("fio: unknown rdma protocol - %d\n", rd->rdma_protocol);
561                 break;
562         }
563
564         if (rd->cq_event_num > 0) {     /* previous left */
565                 rd->cq_event_num--;
566                 return 0;
567         }
568
569 again:
570         if (ibv_get_cq_event(rd->channel, &ev_cq, &ev_ctx) != 0) {
571                 log_err("fio: Failed to get cq event!\n");
572                 return -1;
573         }
574         if (ev_cq != rd->cq) {
575                 log_err("fio: Unknown CQ!\n");
576                 return -1;
577         }
578         if (ibv_req_notify_cq(rd->cq, 0) != 0) {
579                 log_err("fio: Failed to set notify!\n");
580                 return -1;
581         }
582
583         ret = cq_event_handler(td, comp_opcode);
584         if (ret < 1)
585                 goto again;
586
587         ibv_ack_cq_events(rd->cq, ret);
588
589         r += ret;
590         if (r < min)
591                 goto again;
592
593         rd->cq_event_num -= r;
594
595         return r;
596 }
597
598 static int fio_rdmaio_send(struct thread_data *td, struct io_u **io_us,
599                            unsigned int nr)
600 {
601         struct rdmaio_data *rd = td->io_ops->data;
602         struct ibv_send_wr *bad_wr;
603 #if 0
604         enum ibv_wc_opcode comp_opcode;
605         comp_opcode = IBV_WC_RDMA_WRITE;
606 #endif
607         int i;
608         long index;
609         struct rdma_io_u_data *r_io_u_d;
610
611         r_io_u_d = NULL;
612
613         for (i = 0; i < nr; i++) {
614                 /* RDMA_WRITE or RDMA_READ */
615                 switch (rd->rdma_protocol) {
616                 case FIO_RDMA_MEM_WRITE:
617                         /* compose work request */
618                         r_io_u_d = io_us[i]->engine_data;
619                         if (td->o.use_os_rand)
620                                 index = os_random_long(&td->random_state) % rd->rmt_nr;
621                         else
622                                 index = __rand(&rd->rand_state) % rd->rmt_nr;
623                         r_io_u_d->sq_wr.opcode = IBV_WR_RDMA_WRITE;
624                         r_io_u_d->sq_wr.wr.rdma.rkey = rd->rmt_us[index].rkey;
625                         r_io_u_d->sq_wr.wr.rdma.remote_addr = \
626                                 rd->rmt_us[index].buf;
627                         r_io_u_d->sq_wr.sg_list->length = io_us[i]->buflen;
628                         break;
629                 case FIO_RDMA_MEM_READ:
630                         /* compose work request */
631                         r_io_u_d = io_us[i]->engine_data;
632                         if (td->o.use_os_rand)
633                                 index = os_random_long(&td->random_state) % rd->rmt_nr;
634                         else
635                                 index = __rand(&rd->rand_state) % rd->rmt_nr;
636                         r_io_u_d->sq_wr.opcode = IBV_WR_RDMA_READ;
637                         r_io_u_d->sq_wr.wr.rdma.rkey = rd->rmt_us[index].rkey;
638                         r_io_u_d->sq_wr.wr.rdma.remote_addr = \
639                                 rd->rmt_us[index].buf;
640                         r_io_u_d->sq_wr.sg_list->length = io_us[i]->buflen;
641                         break;
642                 case FIO_RDMA_CHA_SEND:
643                         r_io_u_d = io_us[i]->engine_data;
644                         r_io_u_d->sq_wr.opcode = IBV_WR_SEND;
645                         r_io_u_d->sq_wr.send_flags = IBV_SEND_SIGNALED;
646                         break;
647                 default:
648                         log_err("fio: unknown rdma protocol - %d\n",
649                                 rd->rdma_protocol);
650                         break;
651                 }
652
653                 if (ibv_post_send(rd->qp, &r_io_u_d->sq_wr, &bad_wr) != 0) {
654                         log_err("fio: ibv_post_send fail\n");
655                         return -1;
656                 }
657
658                 dprint_io_u(io_us[i], "fio_rdmaio_send");
659         }
660
661         /* wait for completion
662            rdma_poll_wait(td, comp_opcode); */
663
664         return i;
665 }
666
667 static int fio_rdmaio_recv(struct thread_data *td, struct io_u **io_us,
668                            unsigned int nr)
669 {
670         struct rdmaio_data *rd = td->io_ops->data;
671         struct ibv_recv_wr *bad_wr;
672         struct rdma_io_u_data *r_io_u_d;
673         int i;
674
675         i = 0;
676         if (rd->rdma_protocol == FIO_RDMA_CHA_RECV) {
677                 /* post io_u into recv queue */
678                 for (i = 0; i < nr; i++) {
679                         r_io_u_d = io_us[i]->engine_data;
680                         if (ibv_post_recv(rd->qp, &r_io_u_d->rq_wr, &bad_wr) !=
681                             0) {
682                                 log_err("fio: ibv_post_recv fail\n");
683                                 return 1;
684                         }
685                 }
686         } else if ((rd->rdma_protocol == FIO_RDMA_MEM_READ)
687                    || (rd->rdma_protocol == FIO_RDMA_MEM_WRITE)) {
688                 /* re-post the rq_wr */
689                 if (ibv_post_recv(rd->qp, &rd->rq_wr, &bad_wr) != 0) {
690                         log_err("fio: ibv_post_recv fail\n");
691                         return 1;
692                 }
693
694                 rdma_poll_wait(td, IBV_WC_RECV);
695
696                 dprint(FD_IO, "fio: recv FINISH message\n");
697                 td->done = 1;
698                 return 0;
699         }
700
701         return i;
702 }
703
704 static int fio_rdmaio_queue(struct thread_data *td, struct io_u *io_u)
705 {
706         struct rdmaio_data *rd = td->io_ops->data;
707
708         fio_ro_check(td, io_u);
709
710         if (rd->io_u_queued_nr == (int)td->o.iodepth)
711                 return FIO_Q_BUSY;
712
713         rd->io_us_queued[rd->io_u_queued_nr] = io_u;
714         rd->io_u_queued_nr++;
715
716         dprint_io_u(io_u, "fio_rdmaio_queue");
717
718         return FIO_Q_QUEUED;
719 }
720
721 static void fio_rdmaio_queued(struct thread_data *td, struct io_u **io_us,
722                               unsigned int nr)
723 {
724         struct rdmaio_data *rd = td->io_ops->data;
725         struct timeval now;
726         unsigned int i;
727
728         if (!fio_fill_issue_time(td))
729                 return;
730
731         fio_gettime(&now, NULL);
732
733         for (i = 0; i < nr; i++) {
734                 struct io_u *io_u = io_us[i];
735
736                 /* queued -> flight */
737                 rd->io_us_flight[rd->io_u_flight_nr] = io_u;
738                 rd->io_u_flight_nr++;
739
740                 memcpy(&io_u->issue_time, &now, sizeof(now));
741                 io_u_queued(td, io_u);
742         }
743 }
744
745 static int fio_rdmaio_commit(struct thread_data *td)
746 {
747         struct rdmaio_data *rd = td->io_ops->data;
748         struct io_u **io_us;
749         int ret;
750
751         if (!rd->io_us_queued)
752                 return 0;
753
754         io_us = rd->io_us_queued;
755         do {
756                 /* RDMA_WRITE or RDMA_READ */
757                 if (rd->is_client) {
758                         ret = fio_rdmaio_send(td, io_us, rd->io_u_queued_nr);
759                 } else if (!rd->is_client) {
760                         ret = fio_rdmaio_recv(td, io_us, rd->io_u_queued_nr);
761                 } else
762                         ret = 0;        /* must be a SYNC */
763
764                 if (ret > 0) {
765                         fio_rdmaio_queued(td, io_us, ret);
766                         io_u_mark_submit(td, ret);
767                         rd->io_u_queued_nr -= ret;
768                         io_us += ret;
769                         ret = 0;
770                 } else
771                         break;
772         } while (rd->io_u_queued_nr);
773
774         return ret;
775 }
776
777 static int fio_rdmaio_connect(struct thread_data *td, struct fio_file *f)
778 {
779         struct rdmaio_data *rd = td->io_ops->data;
780         struct rdma_conn_param conn_param;
781         struct ibv_send_wr *bad_wr;
782
783         memset(&conn_param, 0, sizeof conn_param);
784         conn_param.responder_resources = 1;
785         conn_param.initiator_depth = 1;
786         conn_param.retry_count = 10;
787
788         if (rdma_connect(rd->cm_id, &conn_param) != 0) {
789                 log_err("fio: rdma_connect fail\n");
790                 return 1;
791         }
792
793         if (get_next_channel_event
794             (td, rd->cm_channel, RDMA_CM_EVENT_ESTABLISHED) != 0) {
795                 log_err("fio: wait for RDMA_CM_EVENT_ESTABLISHED\n");
796                 return 1;
797         }
798
799         /* send task request */
800         rd->send_buf.mode = htonl(rd->rdma_protocol);
801         rd->send_buf.nr = htonl(td->o.iodepth);
802
803         if (ibv_post_send(rd->qp, &rd->sq_wr, &bad_wr) != 0) {
804                 log_err("fio: ibv_post_send fail");
805                 return 1;
806         }
807
808         rdma_poll_wait(td, IBV_WC_SEND);
809
810         /* wait for remote MR info from server side */
811         rdma_poll_wait(td, IBV_WC_RECV);
812
813         /* In SEND/RECV test, it's a good practice to setup the iodepth of
814          * of the RECV side deeper than that of the SEND side to
815          * avoid RNR (receiver not ready) error. The
816          * SEND side may send so many unsolicited message before 
817          * RECV side commits sufficient recv buffers into recv queue.
818          * This may lead to RNR error. Here, SEND side pauses for a while
819          * during which RECV side commits sufficient recv buffers.
820          */
821         usleep(500000);
822
823         return 0;
824 }
825
826 static int fio_rdmaio_accept(struct thread_data *td, struct fio_file *f)
827 {
828         struct rdmaio_data *rd = td->io_ops->data;
829         struct rdma_conn_param conn_param;
830         struct ibv_send_wr *bad_wr;
831
832         /* rdma_accept() - then wait for accept success */
833         memset(&conn_param, 0, sizeof conn_param);
834         conn_param.responder_resources = 1;
835         conn_param.initiator_depth = 1;
836
837         if (rdma_accept(rd->child_cm_id, &conn_param) != 0) {
838                 log_err("fio: rdma_accept\n");
839                 return 1;
840         }
841
842         if (get_next_channel_event
843             (td, rd->cm_channel, RDMA_CM_EVENT_ESTABLISHED) != 0) {
844                 log_err("fio: wait for RDMA_CM_EVENT_ESTABLISHED\n");
845                 return 1;
846         }
847
848         /* wait for request */
849         rdma_poll_wait(td, IBV_WC_RECV);
850
851         if (ibv_post_send(rd->qp, &rd->sq_wr, &bad_wr) != 0) {
852                 log_err("fio: ibv_post_send fail");
853                 return 1;
854         }
855
856         rdma_poll_wait(td, IBV_WC_SEND);
857
858         return 0;
859 }
860
861 static int fio_rdmaio_open_file(struct thread_data *td, struct fio_file *f)
862 {
863         if (td_read(td))
864                 return fio_rdmaio_accept(td, f);
865         else
866                 return fio_rdmaio_connect(td, f);
867 }
868
869 static int fio_rdmaio_close_file(struct thread_data *td, struct fio_file *f)
870 {
871         struct rdmaio_data *rd = td->io_ops->data;
872         struct ibv_send_wr *bad_wr;
873
874         /* unregister rdma buffer */
875
876         /*
877          * Client sends notification to the server side
878          */
879         /* refer to: http://linux.die.net/man/7/rdma_cm */
880         if ((rd->is_client == 1) && ((rd->rdma_protocol == FIO_RDMA_MEM_WRITE)
881                                      || (rd->rdma_protocol ==
882                                          FIO_RDMA_MEM_READ))) {
883                 if (ibv_post_send(rd->qp, &rd->sq_wr, &bad_wr) != 0) {
884                         log_err("fio: ibv_post_send fail");
885                         return 1;
886                 }
887
888                 dprint(FD_IO, "fio: close infomation sent success\n");
889                 rdma_poll_wait(td, IBV_WC_SEND);
890         }
891
892         if (rd->is_client == 1)
893                 rdma_disconnect(rd->cm_id);
894         else {
895                 rdma_disconnect(rd->child_cm_id);
896 /*        rdma_disconnect(rd->cm_id); */
897         }
898
899 /*    if (get_next_channel_event(td, rd->cm_channel, RDMA_CM_EVENT_DISCONNECTED) != 0)
900     {
901         log_err("fio: wait for RDMA_CM_EVENT_DISCONNECTED\n");
902         return 1;
903     }*/
904
905         ibv_destroy_cq(rd->cq);
906         ibv_destroy_qp(rd->qp);
907
908         if (rd->is_client == 1)
909                 rdma_destroy_id(rd->cm_id);
910         else {
911                 rdma_destroy_id(rd->child_cm_id);
912                 rdma_destroy_id(rd->cm_id);
913         }
914
915         ibv_destroy_comp_channel(rd->channel);
916         ibv_dealloc_pd(rd->pd);
917
918         return 0;
919 }
920
921 static int fio_rdmaio_setup_connect(struct thread_data *td, const char *host,
922                                     unsigned short port)
923 {
924         struct rdmaio_data *rd = td->io_ops->data;
925         struct ibv_recv_wr *bad_wr;
926
927         rd->addr.sin_family = AF_INET;
928         rd->addr.sin_port = htons(port);
929
930         if (inet_aton(host, &rd->addr.sin_addr) != 1) {
931                 struct hostent *hent;
932
933                 hent = gethostbyname(host);
934                 if (!hent) {
935                         td_verror(td, errno, "gethostbyname");
936                         return 1;
937                 }
938
939                 memcpy(&rd->addr.sin_addr, hent->h_addr, 4);
940         }
941
942         /* resolve route */
943         if (rdma_resolve_addr(rd->cm_id, NULL,
944                               (struct sockaddr *)&rd->addr, 2000) != 0) {
945                 log_err("fio: rdma_resolve_addr");
946                 return 1;
947         }
948
949         if (get_next_channel_event
950             (td, rd->cm_channel, RDMA_CM_EVENT_ADDR_RESOLVED)
951             != 0) {
952                 log_err("fio: get_next_channel_event");
953                 return 1;
954         }
955
956         /* resolve route */
957         if (rdma_resolve_route(rd->cm_id, 2000) != 0) {
958                 log_err("fio: rdma_resolve_route");
959                 return 1;
960         }
961
962         if (get_next_channel_event
963             (td, rd->cm_channel, RDMA_CM_EVENT_ROUTE_RESOLVED) != 0) {
964                 log_err("fio: get_next_channel_event");
965                 return 1;
966         }
967
968         /* create qp and buffer */
969         if (fio_rdmaio_setup_qp(td) != 0)
970                 return 1;
971
972         if (fio_rdmaio_setup_control_msg_buffers(td) != 0)
973                 return 1;
974
975         /* post recv buf */
976         if (ibv_post_recv(rd->qp, &rd->rq_wr, &bad_wr) != 0) {
977                 log_err("fio: ibv_post_recv fail\n");
978                 return 1;
979         }
980
981         return 0;
982 }
983
984 static int fio_rdmaio_setup_listen(struct thread_data *td, short port)
985 {
986         struct rdmaio_data *rd = td->io_ops->data;
987         struct ibv_recv_wr *bad_wr;
988
989         rd->addr.sin_family = AF_INET;
990         rd->addr.sin_addr.s_addr = htonl(INADDR_ANY);
991         rd->addr.sin_port = htons(port);
992
993         /* rdma_listen */
994         if (rdma_bind_addr(rd->cm_id, (struct sockaddr *)&rd->addr) != 0) {
995                 log_err("fio: rdma_bind_addr fail\n");
996                 return 1;
997         }
998
999         if (rdma_listen(rd->cm_id, 3) != 0) {
1000                 log_err("fio: rdma_listen fail\n");
1001                 return 1;
1002         }
1003
1004         /* wait for CONNECT_REQUEST */
1005         if (get_next_channel_event
1006             (td, rd->cm_channel, RDMA_CM_EVENT_CONNECT_REQUEST) != 0) {
1007                 log_err("fio: wait for RDMA_CM_EVENT_CONNECT_REQUEST\n");
1008                 return 1;
1009         }
1010
1011         if (fio_rdmaio_setup_qp(td) != 0)
1012                 return 1;
1013
1014         if (fio_rdmaio_setup_control_msg_buffers(td) != 0)
1015                 return 1;
1016
1017         /* post recv buf */
1018         if (ibv_post_recv(rd->qp, &rd->rq_wr, &bad_wr) != 0) {
1019                 log_err("fio: ibv_post_recv fail\n");
1020                 return 1;
1021         }
1022
1023         return 0;
1024 }
1025
1026 static int fio_rdmaio_init(struct thread_data *td)
1027 {
1028         struct rdmaio_data *rd = td->io_ops->data;
1029         unsigned int port;
1030         char host[64], buf[128];
1031         char *sep, *portp, *modep;
1032         int ret;
1033         struct rlimit rl;
1034
1035         if (td_rw(td)) {
1036                 log_err("fio: rdma connections must be read OR write\n");
1037                 return 1;
1038         }
1039         if (td_random(td)) {
1040                 log_err("fio: RDMA network IO can't be random\n");
1041                 return 1;
1042         }
1043
1044         /* check RLIMIT_MEMLOCK */
1045         if (getrlimit(RLIMIT_MEMLOCK, &rl) != 0) {
1046                 log_err("fio: getrlimit fail: %d(%s)\n",
1047                         errno, strerror(errno));
1048                 return 1;
1049         }
1050
1051         /* soft limit */
1052         if ((rl.rlim_cur != RLIM_INFINITY)
1053             && (rl.rlim_cur < td->orig_buffer_size)) {
1054                 log_err("fio: soft RLIMIT_MEMLOCK is: %" PRId64 "\n",
1055                         rl.rlim_cur);
1056                 log_err("fio: total block size is:    %zd\n",
1057                         td->orig_buffer_size);
1058                 /* try to set larger RLIMIT_MEMLOCK */
1059                 rl.rlim_cur = rl.rlim_max;
1060                 if (setrlimit(RLIMIT_MEMLOCK, &rl) != 0) {
1061                         log_err("fio: setrlimit fail: %d(%s)\n",
1062                                 errno, strerror(errno));
1063                         log_err("fio: you may try enlarge MEMLOCK by root\n");
1064                         log_err("# ulimit -l unlimited\n");
1065                         return 1;
1066                 }
1067         }
1068
1069         strcpy(buf, td->o.filename);
1070
1071         sep = strchr(buf, '/');
1072         if (!sep)
1073                 goto bad_host;
1074
1075         *sep = '\0';
1076         sep++;
1077         strcpy(host, buf);
1078         if (!strlen(host))
1079                 goto bad_host;
1080
1081         modep = NULL;
1082         portp = sep;
1083         sep = strchr(portp, '/');
1084         if (sep) {
1085                 *sep = '\0';
1086                 modep = sep + 1;
1087         }
1088
1089         port = strtol(portp, NULL, 10);
1090         if (!port || port > 65535)
1091                 goto bad_host;
1092
1093         if (modep) {
1094                 if (!strncmp("rdma_write", modep, strlen(modep)) ||
1095                     !strncmp("RDMA_WRITE", modep, strlen(modep)))
1096                         rd->rdma_protocol = FIO_RDMA_MEM_WRITE;
1097                 else if (!strncmp("rdma_read", modep, strlen(modep)) ||
1098                          !strncmp("RDMA_READ", modep, strlen(modep)))
1099                         rd->rdma_protocol = FIO_RDMA_MEM_READ;
1100                 else if (!strncmp("send", modep, strlen(modep)) ||
1101                          !strncmp("SEND", modep, strlen(modep)))
1102                         rd->rdma_protocol = FIO_RDMA_CHA_SEND;
1103                 else
1104                         goto bad_host;
1105         } else
1106                 rd->rdma_protocol = FIO_RDMA_MEM_WRITE;
1107
1108         rd->cq_event_num = 0;
1109
1110         rd->cm_channel = rdma_create_event_channel();
1111         if (!rd->cm_channel) {
1112                 log_err("fio: rdma_create_event_channel fail\n");
1113                 return 1;
1114         }
1115
1116         ret = rdma_create_id(rd->cm_channel, &rd->cm_id, rd, RDMA_PS_TCP);
1117         if (ret) {
1118                 log_err("fio: rdma_create_id fail\n");
1119                 return 1;
1120         }
1121
1122         if ((rd->rdma_protocol == FIO_RDMA_MEM_WRITE) ||
1123             (rd->rdma_protocol == FIO_RDMA_MEM_READ)) {
1124                 rd->rmt_us =
1125                         malloc(FIO_RDMA_MAX_IO_DEPTH * sizeof(struct remote_u));
1126                 memset(rd->rmt_us, 0,
1127                         FIO_RDMA_MAX_IO_DEPTH * sizeof(struct remote_u));
1128                 rd->rmt_nr = 0;
1129         }
1130
1131         rd->io_us_queued = malloc(td->o.iodepth * sizeof(struct io_u *));
1132         memset(rd->io_us_queued, 0, td->o.iodepth * sizeof(struct io_u *));
1133         rd->io_u_queued_nr = 0;
1134
1135         rd->io_us_flight = malloc(td->o.iodepth * sizeof(struct io_u *));
1136         memset(rd->io_us_flight, 0, td->o.iodepth * sizeof(struct io_u *));
1137         rd->io_u_flight_nr = 0;
1138
1139         rd->io_us_completed = malloc(td->o.iodepth * sizeof(struct io_u *));
1140         memset(rd->io_us_completed, 0, td->o.iodepth * sizeof(struct io_u *));
1141         rd->io_u_completed_nr = 0;
1142
1143         if (td_read(td)) {      /* READ as the server */
1144                 rd->is_client = 0;
1145                 /* server rd->rdma_buf_len will be setup after got request */
1146                 ret = fio_rdmaio_setup_listen(td, port);
1147         } else {                /* WRITE as the client */
1148                 rd->is_client = 1;
1149                 ret = fio_rdmaio_setup_connect(td, host, port);
1150         }
1151
1152         struct flist_head *entry;
1153         unsigned int max_bs;
1154         max_bs = max(td->o.max_bs[DDIR_READ], td->o.max_bs[DDIR_WRITE]);
1155         /* register each io_u in the free list */
1156         int i = 0;
1157         flist_for_each(entry, &td->io_u_freelist) {
1158                 struct io_u *io_u = flist_entry(entry, struct io_u, list);
1159
1160                 io_u->engine_data = malloc(sizeof(struct rdma_io_u_data));
1161                 memset(io_u->engine_data, 0, sizeof(struct rdma_io_u_data));
1162                 ((struct rdma_io_u_data *)io_u->engine_data)->wr_id = i;
1163
1164                 io_u->mr = ibv_reg_mr(rd->pd, io_u->buf, max_bs,
1165                                       IBV_ACCESS_LOCAL_WRITE |
1166                                       IBV_ACCESS_REMOTE_READ |
1167                                       IBV_ACCESS_REMOTE_WRITE);
1168                 if (io_u->mr == NULL) {
1169                         log_err("fio: ibv_reg_mr io_u failed\n");
1170                         return 1;
1171                 }
1172
1173                 rd->send_buf.rmt_us[i].buf =
1174                     htonll((uint64_t) (unsigned long)io_u->buf);
1175                 rd->send_buf.rmt_us[i].rkey = htonl(io_u->mr->rkey);
1176                 rd->send_buf.rmt_us[i].size = htonl(max_bs);
1177
1178 /*    log_info("fio: Send rkey %x addr %" PRIx64 " len %d to client\n",
1179           io_u->mr->rkey, io_u->buf, max_bs); */
1180                 i++;
1181         }
1182
1183         rd->send_buf.nr = htonl(i);
1184
1185         return ret;
1186 bad_host:
1187         log_err("fio: bad rdma host/port/protocol: %s\n", td->o.filename);
1188         return 1;
1189 }
1190
1191 static void fio_rdmaio_cleanup(struct thread_data *td)
1192 {
1193         struct rdmaio_data *rd = td->io_ops->data;
1194
1195         if (rd) {
1196 /*        if (nd->listenfd != -1)
1197             close(nd->listenfd);
1198         if (nd->pipes[0] != -1)
1199             close(nd->pipes[0]);
1200         if (nd->pipes[1] != -1)
1201             close(nd->pipes[1]);
1202 */
1203                 free(rd);
1204         }
1205 }
1206
1207 static int fio_rdmaio_setup(struct thread_data *td)
1208 {
1209         struct rdmaio_data *rd;
1210
1211         if (!td->io_ops->data) {
1212                 rd = malloc(sizeof(*rd));;
1213
1214                 memset(rd, 0, sizeof(*rd));
1215                 init_rand_seed(&rd->rand_state, GOLDEN_RATIO_PRIME);
1216                 td->io_ops->data = rd;
1217         }
1218
1219         return 0;
1220 }
1221
1222 static struct ioengine_ops ioengine_rw = {
1223         .name           = "rdma",
1224         .version        = FIO_IOOPS_VERSION,
1225         .setup          = fio_rdmaio_setup,
1226         .init           = fio_rdmaio_init,
1227         .prep           = fio_rdmaio_prep,
1228         .queue          = fio_rdmaio_queue,
1229         .commit         = fio_rdmaio_commit,
1230         .getevents      = fio_rdmaio_getevents,
1231         .event          = fio_rdmaio_event,
1232         .cleanup        = fio_rdmaio_cleanup,
1233         .open_file      = fio_rdmaio_open_file,
1234         .close_file     = fio_rdmaio_close_file,
1235         .flags          = FIO_DISKLESSIO | FIO_UNIDIR | FIO_PIPEIO,
1236 };
1237
1238 #else /* FIO_HAVE_RDMA */
1239
1240 static int fio_rdmaio_open_file(struct thread_data *td, struct fio_file *f)
1241 {
1242         return 0;
1243 }
1244
1245 static int fio_rdmaio_close_file(struct thread_data *td, struct fio_file *f)
1246 {
1247         return 0;
1248 }
1249
1250 static int fio_rdmaio_queue(struct thread_data *td, struct io_u *io_u)
1251 {
1252         return FIO_Q_COMPLETED;
1253 }
1254
1255 static int fio_rdmaio_init(struct thread_data fio_unused * td)
1256 {
1257         log_err("fio: rdma(librdmacm libibverbs) not available\n");
1258         log_err("     You haven't compiled rdma ioengine into fio.\n");
1259         log_err("     If you want to try rdma ioengine,\n");
1260         log_err("     make sure OFED is installed,\n");
1261         log_err("     $ ofed_info\n");
1262         log_err("     then try to make fio as follows:\n");
1263         log_err("     $ export EXTFLAGS+=\" -DFIO_HAVE_RDMA \"\n");
1264         log_err("     $ export EXTLIBS+=\" -libverbs -lrdmacm \"\n");
1265         log_err("     $ make clean && make\n");
1266         return 1;
1267 }
1268
1269 static struct ioengine_ops ioengine_rw = {
1270         .name           = "rdma",
1271         .version        = FIO_IOOPS_VERSION,
1272         .init           = fio_rdmaio_init,
1273         .queue          = fio_rdmaio_queue,
1274         .open_file      = fio_rdmaio_open_file,
1275         .close_file     = fio_rdmaio_close_file,
1276         .flags          = FIO_SYNCIO | FIO_DISKLESSIO | FIO_UNIDIR | FIO_PIPEIO,
1277 };
1278
1279 #endif
1280
1281 static void fio_init fio_rdmaio_register(void)
1282 {
1283         register_ioengine(&ioengine_rw);
1284 }
1285
1286 static void fio_exit fio_rdmaio_unregister(void)
1287 {
1288         unregister_ioengine(&ioengine_rw);
1289 }