summaryrefslogtreecommitdiff
path: root/examples
diff options
context:
space:
mode:
author李通洲 <carter.li@eoitek.com>2019-09-21 09:43:43 +0800
committer李通洲 <carter.li@eoitek.com>2019-09-22 09:48:02 +0800
commita05c7a6c8958e99bbe6b69a50c742ef896de587c (patch)
tree497d63cd86431f0c215a54c822ce3e1ea1504907 /examples
parente66e2b31980c67ba151dd22069929b0091fabc64 (diff)
downloadliburing-a05c7a6c8958e99bbe6b69a50c742ef896de587c.tar.gz
liburing-a05c7a6c8958e99bbe6b69a50c742ef896de587c.tar.bz2
examples/ucontext-cp: more complex example to show how ucontext works
Allow multiple files being copied in one command, and print more logs Also add timeout to indicate that different coroutines run in parallel actually Signed-off-by: 李通洲 <carter.li@eoitek.com>
Diffstat (limited to 'examples')
-rw-r--r--examples/ucontext-cp.c181
1 files changed, 130 insertions, 51 deletions
diff --git a/examples/ucontext-cp.c b/examples/ucontext-cp.c
index a7cfd44..211c271 100644
--- a/examples/ucontext-cp.c
+++ b/examples/ucontext-cp.c
@@ -1,6 +1,7 @@
/*
* gcc -Wall -O2 -D_GNU_SOURCE -o ucontext-cp ucontext-cp.c -luring
*/
+#define _POSIX_C_SOURCE 199309L
#include <stdio.h>
#include <fcntl.h>
#include <string.h>
@@ -13,20 +14,25 @@
#include <inttypes.h>
#include <sys/types.h>
#include <sys/ioctl.h>
+#include <sys/timerfd.h>
+#include <sys/poll.h>
#include "liburing.h"
#define QD 64
-#define BS (32*1024)
+#define BS 1024
+
+#define SIGSTKSZ 8192
typedef struct {
- struct io_uring ring;
+ struct io_uring *ring;
unsigned char stack_buf[SIGSTKSZ];
ucontext_t ctx_main, ctx_fnew;
} async_context;
typedef struct {
async_context *pctx;
- int ret;
+ int *psuccess;
+ int *pfailure;
int infd;
int outfd;
} arguments_bundle;
@@ -39,7 +45,7 @@ static ssize_t await_##operation( \
unsigned int nr_vecs, \
off_t offset) \
{ \
- struct io_uring_sqe *sqe = io_uring_get_sqe(&pctx->ring); \
+ struct io_uring_sqe *sqe = io_uring_get_sqe(pctx->ring); \
struct io_uring_cqe *cqe; \
\
if (!sqe) \
@@ -47,11 +53,10 @@ static ssize_t await_##operation( \
\
io_uring_prep_##operation(sqe, fd, ioves, nr_vecs, offset); \
io_uring_sqe_set_data(sqe, pctx); \
- io_uring_submit(&pctx->ring); \
swapcontext(&pctx->ctx_fnew, &pctx->ctx_main); \
- if (io_uring_peek_cqe(&pctx->ring, &cqe) < 0) \
- return -1; \
- io_uring_cqe_seen(&pctx->ring, cqe); \
+ io_uring_peek_cqe(pctx->ring, &cqe); \
+ assert(cqe); \
+ io_uring_cqe_seen(pctx->ring, cqe); \
\
return cqe->res; \
}
@@ -60,16 +65,51 @@ DEFINE_AWAIT_OP(readv)
DEFINE_AWAIT_OP(writev)
#undef DEFINE_AWAIT_OP
-static int setup_context(unsigned entries, async_context *pctx)
-{
- int ret;
+int await_poll(async_context *pctx, int fd, short poll_mask) {
+ struct io_uring_sqe *sqe = io_uring_get_sqe(pctx->ring);
+ struct io_uring_cqe *cqe;
+ if (!sqe)
+ return -1;
- ret = io_uring_queue_init(entries, &pctx->ring, 0);
- if (ret < 0) {
- fprintf(stderr, "queue_init: %s\n", strerror(-ret));
+ io_uring_prep_poll_add(sqe, fd, poll_mask);
+ io_uring_sqe_set_data(sqe, pctx);
+ swapcontext(&pctx->ctx_fnew, &pctx->ctx_main);
+ io_uring_peek_cqe(pctx->ring, &cqe);
+ assert(cqe);
+ io_uring_cqe_seen(pctx->ring, cqe);
+
+ return cqe->res;
+}
+
+int await_delay(async_context *pctx, time_t seconds) {
+ struct itimerspec exp = {
+ .it_interval = {},
+ .it_value = { seconds, 0 },
+ };
+ int tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK);
+ if (tfd < 0) {
+ perror("timerfd_create");
+ return -1;
+ }
+ if (timerfd_settime(tfd, 0, &exp, NULL)) {
+ perror("timerfd_settime");
+ close(tfd);
return -1;
}
+ int ret = await_poll(pctx, tfd, POLLIN);
+ assert(ret == POLLIN);
+
+ close(tfd);
+
+ return 0;
+}
+
+static int setup_context(async_context *pctx, struct io_uring *ring)
+{
+ int ret;
+
+ pctx->ring = ring;
ret = getcontext(&pctx->ctx_fnew);
if (ret < 0) {
perror("getcontext");
@@ -89,14 +129,17 @@ static int copy_file(async_context *pctx, int infd, int outfd, struct iovec* pio
for (;;) {
ssize_t bytes_read;
+ printf("%d->%d: readv %ld bytes from %ld\n", infd, outfd, (long) piov->iov_len, (long) offset);
if ((bytes_read = await_readv(pctx, infd, piov, 1, offset)) < 0) {
perror("await_readv");
return 1;
}
if (bytes_read == 0)
return 0;
+
piov->iov_len = bytes_read;
+ printf("%d->%d: writev %ld bytes from %ld\n", infd, outfd, (long) piov->iov_len, (long) offset);
if (await_writev(pctx, outfd, piov, 1, offset) != bytes_read) {
perror("await_writev");
return 1;
@@ -104,75 +147,111 @@ static int copy_file(async_context *pctx, int infd, int outfd, struct iovec* pio
if (bytes_read < BS)
return 0;
offset += bytes_read;
+
+ printf("%d->%d: wait %ds\n", infd, outfd, 1);
+ await_delay(pctx, 1);
}
}
-static void copy_file_wrapper(arguments_bundle *pbundle) {
+static void copy_file_wrapper(arguments_bundle *pbundle)
+{
struct iovec iov = {
.iov_base = malloc(BS),
.iov_len = BS,
};
async_context *pctx = pbundle->pctx;
- pbundle->ret = copy_file(pctx, pbundle->infd, pbundle->outfd, &iov);
+ int ret = copy_file(pctx, pbundle->infd, pbundle->outfd, &iov);
+
+ printf("%d->%d: done with ret code %d\n", pbundle->infd, pbundle->outfd, ret);
+
+ if (ret == 0) {
+ ++*pbundle->psuccess;
+ } else {
+ ++*pbundle->pfailure;
+ }
free(iov.iov_base);
+ close(pbundle->infd);
+ close(pbundle->outfd);
+ free(pbundle->pctx);
+ free(pbundle);
+
swapcontext(&pctx->ctx_fnew, &pctx->ctx_main);
}
int main(int argc, char *argv[])
{
- async_context ctx;
- int infd, outfd;
- struct io_uring_cqe *cqe;
+ int ret;
if (argc < 3) {
- printf("%s: infile outfile\n", argv[0]);
+ fprintf(stderr, "%s: infile1 outfile1 [infile2 outfile2 [...]]\n", argv[0]);
return 1;
}
- infd = open(argv[1], O_RDONLY);
- if (infd < 0) {
- perror("open infile");
- return 1;
- }
- outfd = open(argv[2], O_WRONLY | O_CREAT | O_TRUNC, 0644);
- if (outfd < 0) {
- perror("open outfile");
- return 1;
+ struct io_uring ring;
+ ret = io_uring_queue_init(QD, &ring, 0);
+ if (ret < 0) {
+ fprintf(stderr, "queue_init: %s\n", strerror(-ret));
+ return -1;
}
- if (setup_context(QD, &ctx))
- return 1;
+ int req_count = (argc - 1) / 2;
- arguments_bundle bundle = {
- .pctx = &ctx,
- .ret = -1,
- .infd = infd,
- .outfd = outfd,
- };
+ printf("copying %d files...\n", req_count);
- makecontext(&ctx.ctx_fnew, (void (*)(void)) copy_file_wrapper, 1, &bundle);
+ int success = 0, failure = 0;
- if (swapcontext(&ctx.ctx_main, &ctx.ctx_fnew)) {
- perror("swapcontext");
- return 1;
+ for (int i = 1; i < argc; i += 2) {
+ async_context *pctx = malloc(sizeof(*pctx));
+
+ if (!pctx || setup_context(pctx, &ring))
+ return 1;
+
+ int infd = open(argv[i], O_RDONLY);
+ if (infd < 0) {
+ perror("open infile");
+ return 1;
+ }
+ int outfd = open(argv[i + 1], O_WRONLY | O_CREAT | O_TRUNC, 0644);
+ if (outfd < 0) {
+ perror("open outfile");
+ return 1;
+ }
+
+ arguments_bundle *pbundle = malloc(sizeof(*pbundle));
+ pbundle->pctx = pctx;
+ pbundle->psuccess = &success;
+ pbundle->pfailure = &failure;
+ pbundle->infd = infd;
+ pbundle->outfd = outfd;
+
+ makecontext(&pctx->ctx_fnew, (void (*)(void)) copy_file_wrapper, 1, pbundle);
+
+ if (swapcontext(&pctx->ctx_main, &pctx->ctx_fnew)) {
+ perror("swapcontext");
+ return 1;
+ }
}
/* event loop */
- while (bundle.ret == -1) {
- int ret;
- async_context* pctx;
+ while (success + failure < req_count) {
+ struct io_uring_cqe *cqe;
/* usually be timed waiting */
- ret = io_uring_wait_cqe(&ctx.ring, &cqe);
+ ret = io_uring_submit_and_wait(&ring, 1);
+ if (ret < 0) {
+ fprintf(stderr, "submit_and_wait: %s\n", strerror(-ret));
+ return 1;
+ }
+
+ ret = io_uring_wait_cqe(&ring, &cqe);
if (ret < 0) {
fprintf(stderr, "wait_cqe: %s\n", strerror(-ret));
return 1;
}
- pctx = io_uring_cqe_get_data(cqe);
- assert(pctx == &ctx);
+ async_context *pctx = io_uring_cqe_get_data(cqe);
if (swapcontext(&pctx->ctx_main, &pctx->ctx_fnew)) {
perror("swapcontext");
@@ -180,9 +259,9 @@ int main(int argc, char *argv[])
}
}
- close(outfd);
- close(infd);
- io_uring_queue_exit(&ctx.ring);
+ io_uring_queue_exit(&ring);
- return 0;
+ printf("finished with %d success(es) and %d failure(s)\n", success, failure);
+
+ return failure > 0;
}