libceph: new sparse_read op, support sparse reads on msgr2 crc codepath
[linux-2.6-block.git] / net / ceph / messenger_v2.c
index 1df1d29dee920bddbaff384fd36783191e699d51..17c9a858bfbd32248f62f43781babce6852a1b6b 100644 (file)
 #define FRAME_LATE_STATUS_COMPLETE     0xe
 #define FRAME_LATE_STATUS_ABORTED_MASK 0xf
 
-#define IN_S_HANDLE_PREAMBLE           1
-#define IN_S_HANDLE_CONTROL            2
-#define IN_S_HANDLE_CONTROL_REMAINDER  3
-#define IN_S_PREPARE_READ_DATA         4
-#define IN_S_PREPARE_READ_DATA_CONT    5
-#define IN_S_PREPARE_READ_ENC_PAGE     6
-#define IN_S_HANDLE_EPILOGUE           7
-#define IN_S_FINISH_SKIP               8
+#define IN_S_HANDLE_PREAMBLE                   1
+#define IN_S_HANDLE_CONTROL                    2
+#define IN_S_HANDLE_CONTROL_REMAINDER          3
+#define IN_S_PREPARE_READ_DATA                 4
+#define IN_S_PREPARE_READ_DATA_CONT            5
+#define IN_S_PREPARE_READ_ENC_PAGE             6
+#define IN_S_PREPARE_SPARSE_DATA               7
+#define IN_S_PREPARE_SPARSE_DATA_CONT          8
+#define IN_S_HANDLE_EPILOGUE                   9
+#define IN_S_FINISH_SKIP                       10
 
 #define OUT_S_QUEUE_DATA               1
 #define OUT_S_QUEUE_DATA_CONT          2
@@ -1825,6 +1827,123 @@ static void prepare_read_data_cont(struct ceph_connection *con)
        con->v2.in_state = IN_S_HANDLE_EPILOGUE;
 }
 
+static int prepare_sparse_read_cont(struct ceph_connection *con)
+{
+       int ret;
+       struct bio_vec bv;
+       char *buf = NULL;
+       struct ceph_msg_data_cursor *cursor = &con->v2.in_cursor;
+
+       WARN_ON(con->v2.in_state != IN_S_PREPARE_SPARSE_DATA_CONT);
+
+       if (iov_iter_is_bvec(&con->v2.in_iter)) {
+               if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) {
+                       con->in_data_crc = crc32c(con->in_data_crc,
+                                                 page_address(con->bounce_page),
+                                                 con->v2.in_bvec.bv_len);
+                       get_bvec_at(cursor, &bv);
+                       memcpy_to_page(bv.bv_page, bv.bv_offset,
+                                      page_address(con->bounce_page),
+                                      con->v2.in_bvec.bv_len);
+               } else {
+                       con->in_data_crc = ceph_crc32c_page(con->in_data_crc,
+                                                           con->v2.in_bvec.bv_page,
+                                                           con->v2.in_bvec.bv_offset,
+                                                           con->v2.in_bvec.bv_len);
+               }
+
+               ceph_msg_data_advance(cursor, con->v2.in_bvec.bv_len);
+               cursor->sr_resid -= con->v2.in_bvec.bv_len;
+               dout("%s: advance by 0x%x sr_resid 0x%x\n", __func__,
+                    con->v2.in_bvec.bv_len, cursor->sr_resid);
+               WARN_ON_ONCE(cursor->sr_resid > cursor->total_resid);
+               if (cursor->sr_resid) {
+                       get_bvec_at(cursor, &bv);
+                       if (bv.bv_len > cursor->sr_resid)
+                               bv.bv_len = cursor->sr_resid;
+                       if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) {
+                               bv.bv_page = con->bounce_page;
+                               bv.bv_offset = 0;
+                       }
+                       set_in_bvec(con, &bv);
+                       con->v2.data_len_remain -= bv.bv_len;
+                       return 0;
+               }
+       } else if (iov_iter_is_kvec(&con->v2.in_iter)) {
+               /* On first call, we have no kvec so don't compute crc */
+               if (con->v2.in_kvec_cnt) {
+                       WARN_ON_ONCE(con->v2.in_kvec_cnt > 1);
+                       con->in_data_crc = crc32c(con->in_data_crc,
+                                                 con->v2.in_kvecs[0].iov_base,
+                                                 con->v2.in_kvecs[0].iov_len);
+               }
+       } else {
+               return -EIO;
+       }
+
+       /* get next extent */
+       ret = con->ops->sparse_read(con, cursor, &buf);
+       if (ret <= 0) {
+               if (ret < 0)
+                       return ret;
+
+               reset_in_kvecs(con);
+               add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_PLAIN_LEN);
+               con->v2.in_state = IN_S_HANDLE_EPILOGUE;
+               return 0;
+       }
+
+       if (buf) {
+               /* receive into buffer */
+               reset_in_kvecs(con);
+               add_in_kvec(con, buf, ret);
+               con->v2.data_len_remain -= ret;
+               return 0;
+       }
+
+       if (ret > cursor->total_resid) {
+               pr_warn("%s: ret 0x%x total_resid 0x%zx resid 0x%zx\n",
+                       __func__, ret, cursor->total_resid, cursor->resid);
+               return -EIO;
+       }
+       get_bvec_at(cursor, &bv);
+       if (bv.bv_len > cursor->sr_resid)
+               bv.bv_len = cursor->sr_resid;
+       if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE)) {
+               if (unlikely(!con->bounce_page)) {
+                       con->bounce_page = alloc_page(GFP_NOIO);
+                       if (!con->bounce_page) {
+                               pr_err("failed to allocate bounce page\n");
+                               return -ENOMEM;
+                       }
+               }
+
+               bv.bv_page = con->bounce_page;
+               bv.bv_offset = 0;
+       }
+       set_in_bvec(con, &bv);
+       con->v2.data_len_remain -= ret;
+       return ret;
+}
+
+static int prepare_sparse_read_data(struct ceph_connection *con)
+{
+       struct ceph_msg *msg = con->in_msg;
+
+       dout("%s: starting sparse read\n", __func__);
+
+       if (WARN_ON_ONCE(!con->ops->sparse_read))
+               return -EOPNOTSUPP;
+
+       if (!con_secure(con))
+               con->in_data_crc = -1;
+
+       reset_in_kvecs(con);
+       con->v2.in_state = IN_S_PREPARE_SPARSE_DATA_CONT;
+       con->v2.data_len_remain = data_len(msg);
+       return prepare_sparse_read_cont(con);
+}
+
 static int prepare_read_tail_plain(struct ceph_connection *con)
 {
        struct ceph_msg *msg = con->in_msg;
@@ -1845,7 +1964,10 @@ static int prepare_read_tail_plain(struct ceph_connection *con)
        }
 
        if (data_len(msg)) {
-               con->v2.in_state = IN_S_PREPARE_READ_DATA;
+               if (msg->sparse_read)
+                       con->v2.in_state = IN_S_PREPARE_SPARSE_DATA;
+               else
+                       con->v2.in_state = IN_S_PREPARE_READ_DATA;
        } else {
                add_in_kvec(con, con->v2.in_buf, CEPH_EPILOGUE_PLAIN_LEN);
                con->v2.in_state = IN_S_HANDLE_EPILOGUE;
@@ -2898,6 +3020,12 @@ static int populate_in_iter(struct ceph_connection *con)
                        prepare_read_enc_page(con);
                        ret = 0;
                        break;
+               case IN_S_PREPARE_SPARSE_DATA:
+                       ret = prepare_sparse_read_data(con);
+                       break;
+               case IN_S_PREPARE_SPARSE_DATA_CONT:
+                       ret = prepare_sparse_read_cont(con);
+                       break;
                case IN_S_HANDLE_EPILOGUE:
                        ret = handle_epilogue(con);
                        break;
@@ -3489,6 +3617,23 @@ static void revoke_at_prepare_read_enc_page(struct ceph_connection *con)
        con->v2.in_state = IN_S_FINISH_SKIP;
 }
 
+static void revoke_at_prepare_sparse_data(struct ceph_connection *con)
+{
+       int resid;  /* current piece of data */
+       int remaining;
+
+       WARN_ON(con_secure(con));
+       WARN_ON(!data_len(con->in_msg));
+       WARN_ON(!iov_iter_is_bvec(&con->v2.in_iter));
+       resid = iov_iter_count(&con->v2.in_iter);
+       dout("%s con %p resid %d\n", __func__, con, resid);
+
+       remaining = CEPH_EPILOGUE_PLAIN_LEN + con->v2.data_len_remain;
+       con->v2.in_iter.count -= resid;
+       set_in_skip(con, resid + remaining);
+       con->v2.in_state = IN_S_FINISH_SKIP;
+}
+
 static void revoke_at_handle_epilogue(struct ceph_connection *con)
 {
        int resid;
@@ -3505,6 +3650,7 @@ static void revoke_at_handle_epilogue(struct ceph_connection *con)
 void ceph_con_v2_revoke_incoming(struct ceph_connection *con)
 {
        switch (con->v2.in_state) {
+       case IN_S_PREPARE_SPARSE_DATA:
        case IN_S_PREPARE_READ_DATA:
                revoke_at_prepare_read_data(con);
                break;
@@ -3514,6 +3660,9 @@ void ceph_con_v2_revoke_incoming(struct ceph_connection *con)
        case IN_S_PREPARE_READ_ENC_PAGE:
                revoke_at_prepare_read_enc_page(con);
                break;
+       case IN_S_PREPARE_SPARSE_DATA_CONT:
+               revoke_at_prepare_sparse_data(con);
+               break;
        case IN_S_HANDLE_EPILOGUE:
                revoke_at_handle_epilogue(con);
                break;