libceph: add sparse read support to msgr1
authorJeff Layton <jlayton@kernel.org>
Thu, 24 Mar 2022 17:33:06 +0000 (13:33 -0400)
committerIlya Dryomov <idryomov@gmail.com>
Tue, 22 Aug 2023 07:01:47 +0000 (09:01 +0200)
Add 2 new fields to ceph_connection_v1_info to track the necessary info
in sparse reads. Skip initializing the cursor for a sparse read.

Break out read_partial_message_section into a wrapper around a new
read_partial_message_chunk function that doesn't zero out the crc first.

Add new helper functions to drive receiving into the destinations
provided by the sparse_read state machine.

Signed-off-by: Jeff Layton <jlayton@kernel.org>
Reviewed-by: Xiubo Li <xiubli@redhat.com>
Reviewed-and-tested-by: Luís Henriques <lhenriques@suse.de>
Reviewed-by: Milind Changire <mchangir@redhat.com>
Signed-off-by: Ilya Dryomov <idryomov@gmail.com>
include/linux/ceph/messenger.h
net/ceph/messenger_v1.c

index 8a6938fa324e0d3f2d2d4bcc447b6e6c7d348ed1..9fd7255172ad5793457c07f0b29d3f07c2216984 100644 (file)
@@ -336,6 +336,10 @@ struct ceph_connection_v1_info {
 
        int in_base_pos;     /* bytes read */
 
+       /* sparse reads */
+       struct kvec in_sr_kvec; /* current location to receive into */
+       u64 in_sr_len;          /* amount of data in this extent */
+
        /* message in temps */
        u8 in_tag;           /* protocol control byte */
        struct ceph_msg_header in_hdr;
index 3d57bb48a2b49a584858422ae7dbf205bf5d2f66..f9a50d7f0d204639f821835d341bb87c13a80333 100644 (file)
@@ -159,9 +159,9 @@ static size_t sizeof_footer(struct ceph_connection *con)
 
 static void prepare_message_data(struct ceph_msg *msg, u32 data_len)
 {
-       /* Initialize data cursor */
-
-       ceph_msg_data_cursor_init(&msg->cursor, msg, data_len);
+       /* Initialize data cursor if it's not a sparse read */
+       if (!msg->sparse_read)
+               ceph_msg_data_cursor_init(&msg->cursor, msg, data_len);
 }
 
 /*
@@ -960,9 +960,9 @@ static void process_ack(struct ceph_connection *con)
        prepare_read_tag(con);
 }
 
-static int read_partial_message_section(struct ceph_connection *con,
-                                       struct kvec *section,
-                                       unsigned int sec_len, u32 *crc)
+static int read_partial_message_chunk(struct ceph_connection *con,
+                                     struct kvec *section,
+                                     unsigned int sec_len, u32 *crc)
 {
        int ret, left;
 
@@ -978,11 +978,91 @@ static int read_partial_message_section(struct ceph_connection *con,
                section->iov_len += ret;
        }
        if (section->iov_len == sec_len)
-               *crc = crc32c(0, section->iov_base, section->iov_len);
+               *crc = crc32c(*crc, section->iov_base, section->iov_len);
 
        return 1;
 }
 
+static inline int read_partial_message_section(struct ceph_connection *con,
+                                              struct kvec *section,
+                                              unsigned int sec_len, u32 *crc)
+{
+       *crc = 0;
+       return read_partial_message_chunk(con, section, sec_len, crc);
+}
+
+static int read_sparse_msg_extent(struct ceph_connection *con, u32 *crc)
+{
+       struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
+       bool do_bounce = ceph_test_opt(from_msgr(con->msgr), RXBOUNCE);
+
+       if (do_bounce && unlikely(!con->bounce_page)) {
+               con->bounce_page = alloc_page(GFP_NOIO);
+               if (!con->bounce_page) {
+                       pr_err("failed to allocate bounce page\n");
+                       return -ENOMEM;
+               }
+       }
+
+       while (cursor->sr_resid > 0) {
+               struct page *page, *rpage;
+               size_t off, len;
+               int ret;
+
+               page = ceph_msg_data_next(cursor, &off, &len);
+               rpage = do_bounce ? con->bounce_page : page;
+
+               /* clamp to what remains in extent */
+               len = min_t(int, len, cursor->sr_resid);
+               ret = ceph_tcp_recvpage(con->sock, rpage, (int)off, len);
+               if (ret <= 0)
+                       return ret;
+               *crc = ceph_crc32c_page(*crc, rpage, off, ret);
+               ceph_msg_data_advance(cursor, (size_t)ret);
+               cursor->sr_resid -= ret;
+               if (do_bounce)
+                       memcpy_page(page, off, rpage, off, ret);
+       }
+       return 1;
+}
+
+static int read_sparse_msg_data(struct ceph_connection *con)
+{
+       struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
+       bool do_datacrc = !ceph_test_opt(from_msgr(con->msgr), NOCRC);
+       u32 crc = 0;
+       int ret = 1;
+
+       if (do_datacrc)
+               crc = con->in_data_crc;
+
+       do {
+               if (con->v1.in_sr_kvec.iov_base)
+                       ret = read_partial_message_chunk(con,
+                                                        &con->v1.in_sr_kvec,
+                                                        con->v1.in_sr_len,
+                                                        &crc);
+               else if (cursor->sr_resid > 0)
+                       ret = read_sparse_msg_extent(con, &crc);
+
+               if (ret <= 0) {
+                       if (do_datacrc)
+                               con->in_data_crc = crc;
+                       return ret;
+               }
+
+               memset(&con->v1.in_sr_kvec, 0, sizeof(con->v1.in_sr_kvec));
+               ret = con->ops->sparse_read(con, cursor,
+                               (char **)&con->v1.in_sr_kvec.iov_base);
+               con->v1.in_sr_len = ret;
+       } while (ret > 0);
+
+       if (do_datacrc)
+               con->in_data_crc = crc;
+
+       return ret < 0 ? ret : 1;  /* must return > 0 to indicate success */
+}
+
 static int read_partial_msg_data(struct ceph_connection *con)
 {
        struct ceph_msg_data_cursor *cursor = &con->in_msg->cursor;
@@ -1173,7 +1253,9 @@ static int read_partial_message(struct ceph_connection *con)
                if (!m->num_data_items)
                        return -EIO;
 
-               if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE))
+               if (m->sparse_read)
+                       ret = read_sparse_msg_data(con);
+               else if (ceph_test_opt(from_msgr(con->msgr), RXBOUNCE))
                        ret = read_partial_msg_data_bounce(con);
                else
                        ret = read_partial_msg_data(con);