diff options
author | 李通洲 <carter.li@eoitek.com> | 2019-09-21 09:43:43 +0800 |
---|---|---|
committer | 李通洲 <carter.li@eoitek.com> | 2019-09-22 09:48:02 +0800 |
commit | a05c7a6c8958e99bbe6b69a50c742ef896de587c (patch) | |
tree | 497d63cd86431f0c215a54c822ce3e1ea1504907 /examples | |
parent | e66e2b31980c67ba151dd22069929b0091fabc64 (diff) | |
download | liburing-a05c7a6c8958e99bbe6b69a50c742ef896de587c.tar.gz liburing-a05c7a6c8958e99bbe6b69a50c742ef896de587c.tar.bz2 |
examples/ucontext-cp: more complex example to show how ucontext works
Allow multiple files being copied in one command, and print more logs
Also add timeout to indicate that different coroutines run in parallel actually
Signed-off-by: 李通洲 <carter.li@eoitek.com>
Diffstat (limited to 'examples')
-rw-r--r-- | examples/ucontext-cp.c | 181 |
1 files changed, 130 insertions, 51 deletions
diff --git a/examples/ucontext-cp.c b/examples/ucontext-cp.c index a7cfd44..211c271 100644 --- a/examples/ucontext-cp.c +++ b/examples/ucontext-cp.c @@ -1,6 +1,7 @@ /* * gcc -Wall -O2 -D_GNU_SOURCE -o ucontext-cp ucontext-cp.c -luring */ +#define _POSIX_C_SOURCE 199309L #include <stdio.h> #include <fcntl.h> #include <string.h> @@ -13,20 +14,25 @@ #include <inttypes.h> #include <sys/types.h> #include <sys/ioctl.h> +#include <sys/timerfd.h> +#include <sys/poll.h> #include "liburing.h" #define QD 64 -#define BS (32*1024) +#define BS 1024 + +#define SIGSTKSZ 8192 typedef struct { - struct io_uring ring; + struct io_uring *ring; unsigned char stack_buf[SIGSTKSZ]; ucontext_t ctx_main, ctx_fnew; } async_context; typedef struct { async_context *pctx; - int ret; + int *psuccess; + int *pfailure; int infd; int outfd; } arguments_bundle; @@ -39,7 +45,7 @@ static ssize_t await_##operation( \ unsigned int nr_vecs, \ off_t offset) \ { \ - struct io_uring_sqe *sqe = io_uring_get_sqe(&pctx->ring); \ + struct io_uring_sqe *sqe = io_uring_get_sqe(pctx->ring); \ struct io_uring_cqe *cqe; \ \ if (!sqe) \ @@ -47,11 +53,10 @@ static ssize_t await_##operation( \ \ io_uring_prep_##operation(sqe, fd, ioves, nr_vecs, offset); \ io_uring_sqe_set_data(sqe, pctx); \ - io_uring_submit(&pctx->ring); \ swapcontext(&pctx->ctx_fnew, &pctx->ctx_main); \ - if (io_uring_peek_cqe(&pctx->ring, &cqe) < 0) \ - return -1; \ - io_uring_cqe_seen(&pctx->ring, cqe); \ + io_uring_peek_cqe(pctx->ring, &cqe); \ + assert(cqe); \ + io_uring_cqe_seen(pctx->ring, cqe); \ \ return cqe->res; \ } @@ -60,16 +65,51 @@ DEFINE_AWAIT_OP(readv) DEFINE_AWAIT_OP(writev) #undef DEFINE_AWAIT_OP -static int setup_context(unsigned entries, async_context *pctx) -{ - int ret; +int await_poll(async_context *pctx, int fd, short poll_mask) { + struct io_uring_sqe *sqe = io_uring_get_sqe(pctx->ring); + struct io_uring_cqe *cqe; + if (!sqe) + return -1; - ret = io_uring_queue_init(entries, &pctx->ring, 0); - if (ret < 0) { - fprintf(stderr, "queue_init: %s\n", strerror(-ret)); + io_uring_prep_poll_add(sqe, fd, poll_mask); + io_uring_sqe_set_data(sqe, pctx); + swapcontext(&pctx->ctx_fnew, &pctx->ctx_main); + io_uring_peek_cqe(pctx->ring, &cqe); + assert(cqe); + io_uring_cqe_seen(pctx->ring, cqe); + + return cqe->res; +} + +int await_delay(async_context *pctx, time_t seconds) { + struct itimerspec exp = { + .it_interval = {}, + .it_value = { seconds, 0 }, + }; + int tfd = timerfd_create(CLOCK_MONOTONIC, TFD_NONBLOCK); + if (tfd < 0) { + perror("timerfd_create"); + return -1; + } + if (timerfd_settime(tfd, 0, &exp, NULL)) { + perror("timerfd_settime"); + close(tfd); return -1; } + int ret = await_poll(pctx, tfd, POLLIN); + assert(ret == POLLIN); + + close(tfd); + + return 0; +} + +static int setup_context(async_context *pctx, struct io_uring *ring) +{ + int ret; + + pctx->ring = ring; ret = getcontext(&pctx->ctx_fnew); if (ret < 0) { perror("getcontext"); @@ -89,14 +129,17 @@ static int copy_file(async_context *pctx, int infd, int outfd, struct iovec* pio for (;;) { ssize_t bytes_read; + printf("%d->%d: readv %ld bytes from %ld\n", infd, outfd, (long) piov->iov_len, (long) offset); if ((bytes_read = await_readv(pctx, infd, piov, 1, offset)) < 0) { perror("await_readv"); return 1; } if (bytes_read == 0) return 0; + piov->iov_len = bytes_read; + printf("%d->%d: writev %ld bytes from %ld\n", infd, outfd, (long) piov->iov_len, (long) offset); if (await_writev(pctx, outfd, piov, 1, offset) != bytes_read) { perror("await_writev"); return 1; @@ -104,75 +147,111 @@ static int copy_file(async_context *pctx, int infd, int outfd, struct iovec* pio if (bytes_read < BS) return 0; offset += bytes_read; + + printf("%d->%d: wait %ds\n", infd, outfd, 1); + await_delay(pctx, 1); } } -static void copy_file_wrapper(arguments_bundle *pbundle) { +static void copy_file_wrapper(arguments_bundle *pbundle) +{ struct iovec iov = { .iov_base = malloc(BS), .iov_len = BS, }; async_context *pctx = pbundle->pctx; - pbundle->ret = copy_file(pctx, pbundle->infd, pbundle->outfd, &iov); + int ret = copy_file(pctx, pbundle->infd, pbundle->outfd, &iov); + + printf("%d->%d: done with ret code %d\n", pbundle->infd, pbundle->outfd, ret); + + if (ret == 0) { + ++*pbundle->psuccess; + } else { + ++*pbundle->pfailure; + } free(iov.iov_base); + close(pbundle->infd); + close(pbundle->outfd); + free(pbundle->pctx); + free(pbundle); + swapcontext(&pctx->ctx_fnew, &pctx->ctx_main); } int main(int argc, char *argv[]) { - async_context ctx; - int infd, outfd; - struct io_uring_cqe *cqe; + int ret; if (argc < 3) { - printf("%s: infile outfile\n", argv[0]); + fprintf(stderr, "%s: infile1 outfile1 [infile2 outfile2 [...]]\n", argv[0]); return 1; } - infd = open(argv[1], O_RDONLY); - if (infd < 0) { - perror("open infile"); - return 1; - } - outfd = open(argv[2], O_WRONLY | O_CREAT | O_TRUNC, 0644); - if (outfd < 0) { - perror("open outfile"); - return 1; + struct io_uring ring; + ret = io_uring_queue_init(QD, &ring, 0); + if (ret < 0) { + fprintf(stderr, "queue_init: %s\n", strerror(-ret)); + return -1; } - if (setup_context(QD, &ctx)) - return 1; + int req_count = (argc - 1) / 2; - arguments_bundle bundle = { - .pctx = &ctx, - .ret = -1, - .infd = infd, - .outfd = outfd, - }; + printf("copying %d files...\n", req_count); - makecontext(&ctx.ctx_fnew, (void (*)(void)) copy_file_wrapper, 1, &bundle); + int success = 0, failure = 0; - if (swapcontext(&ctx.ctx_main, &ctx.ctx_fnew)) { - perror("swapcontext"); - return 1; + for (int i = 1; i < argc; i += 2) { + async_context *pctx = malloc(sizeof(*pctx)); + + if (!pctx || setup_context(pctx, &ring)) + return 1; + + int infd = open(argv[i], O_RDONLY); + if (infd < 0) { + perror("open infile"); + return 1; + } + int outfd = open(argv[i + 1], O_WRONLY | O_CREAT | O_TRUNC, 0644); + if (outfd < 0) { + perror("open outfile"); + return 1; + } + + arguments_bundle *pbundle = malloc(sizeof(*pbundle)); + pbundle->pctx = pctx; + pbundle->psuccess = &success; + pbundle->pfailure = &failure; + pbundle->infd = infd; + pbundle->outfd = outfd; + + makecontext(&pctx->ctx_fnew, (void (*)(void)) copy_file_wrapper, 1, pbundle); + + if (swapcontext(&pctx->ctx_main, &pctx->ctx_fnew)) { + perror("swapcontext"); + return 1; + } } /* event loop */ - while (bundle.ret == -1) { - int ret; - async_context* pctx; + while (success + failure < req_count) { + struct io_uring_cqe *cqe; /* usually be timed waiting */ - ret = io_uring_wait_cqe(&ctx.ring, &cqe); + ret = io_uring_submit_and_wait(&ring, 1); + if (ret < 0) { + fprintf(stderr, "submit_and_wait: %s\n", strerror(-ret)); + return 1; + } + + ret = io_uring_wait_cqe(&ring, &cqe); if (ret < 0) { fprintf(stderr, "wait_cqe: %s\n", strerror(-ret)); return 1; } - pctx = io_uring_cqe_get_data(cqe); - assert(pctx == &ctx); + async_context *pctx = io_uring_cqe_get_data(cqe); if (swapcontext(&pctx->ctx_main, &pctx->ctx_fnew)) { perror("swapcontext"); @@ -180,9 +259,9 @@ int main(int argc, char *argv[]) } } - close(outfd); - close(infd); - io_uring_queue_exit(&ctx.ring); + io_uring_queue_exit(&ring); - return 0; + printf("finished with %d success(es) and %d failure(s)\n", success, failure); + + return failure > 0; } |