1
0
mirror of https://github.com/projectacrn/acrn-hypervisor.git synced 2025-05-02 13:44:00 +00:00

dm: block_if: add the io_uring support

io_uring is a high-performance asynchronous I/O framework, primarily designed
to improve the efficiency of input and output (I/O) operations in user-space
applications.
This patch enables io_uring in block_if module. It utilizes the interfaces
provided by the user-space library `liburing` to interact with io_uring
in kernel-space.

To build the acrn-dm with io_uring support, `liburing-dev` package needs to be
installed. For example, it can be installed like below in Ubuntu 22.04.
        sudo apt install liburing-dev

In order to support both the thread pool mechanism and the io_uring mechanism,
an acrn-dm option `aio` is introduced. By default, thread pool mechanism is
selected.
- Example to use io_uring:
  `add_virtual_device    9 virtio-blk iothread,mq=2,/dev/nvme1n1,writeback,aio=io_uring`
- Example to use thread pool:
  `add_virtual_device    9 virtio-blk iothread,mq=2,/dev/nvme1n1,writeback,aio=threads`
- Example to use thread pool (by default):
  `add_virtual_device    9 virtio-blk iothread,mq=2,/dev/nvme1n1,writeback`

v2 -> v3:
 * Update iothread_handler
    - Use the unified eventfd interfaces to read the counter value of the
      ioeventfd.
    - Remove the while loop to read the ioeventfd. It is not necessary
      because one read would reset the counter value to 0.
 * Update iou_submit_sqe to return an error code
   The caller of iou_submit_sqe shall check the return value.
   If there is NO available submission queue entry in the submission queue,
   need to break the while loop. Request can only be submitted when SQE is
   available.

v1 -> v2:
 * move the logic of reading out ioeventfd from iothread.c to virtio.c, because
   it is specific to the virtqueue handling.

Tracked-On: 

Signed-off-by: Shiqing Gao <shiqing.gao@intel.com>
Acked-by: Wang, Yu1 <yu1.wang@intel.com>
This commit is contained in:
Shiqing Gao 2024-05-28 15:30:13 +08:00 committed by acrnsi-robot
parent edb392e7ed
commit fed8ce513c
4 changed files with 377 additions and 29 deletions
devicemodel

View File

@ -80,6 +80,7 @@ LDFLAGS += -L$(TOOLS_OUT)/services
LIBS = -lrt
LIBS += -lpthread
LIBS += -lcrypto
LIBS += -luring
LIBS += -lpciaccess
LIBS += -lusb-1.0
LIBS += -lacrn-mngr

View File

@ -21,7 +21,7 @@
#define MEVENT_MAX 64
#define MAX_EVENT_NUM 64
struct iothread_ctx {
pthread_t tid;
int epfd;
@ -35,25 +35,22 @@ io_thread(void *arg)
{
struct epoll_event eventlist[MEVENT_MAX];
struct iothread_mevent *aevp;
int i, n, status;
char buf[MAX_EVENT_NUM];
int i, n;
while(ioctx.started) {
n = epoll_wait(ioctx.epfd, eventlist, MEVENT_MAX, -1);
if (n < 0) {
if (errno == EINTR)
pr_info("%s: exit from epoll_wait\n", __func__);
else
if (errno == EINTR) {
/* EINTR may happen when io_uring fd is monitored, it is harmless. */
continue;
} else {
pr_err("%s: return from epoll wait with errno %d\r\n", __func__, errno);
break;
break;
}
}
for (i = 0; i < n; i++) {
aevp = eventlist[i].data.ptr;
if (aevp && aevp->run) {
/* Mitigate the epoll_wait repeat cycles by reading out the events as more as possible.*/
do {
status = read(aevp->fd, buf, sizeof(buf));
} while (status == MAX_EVENT_NUM);
(*aevp->run)(aevp->arg);
}
}

View File

@ -41,12 +41,14 @@
#include <pthread.h>
#include <signal.h>
#include <unistd.h>
#include <liburing.h>
#include "dm.h"
#include "block_if.h"
#include "ahci.h"
#include "dm_string.h"
#include "log.h"
#include "iothread.h"
/*
* Notes:
@ -66,6 +68,12 @@
#define BLOCKIF_MAXREQ (64 + BLOCKIF_NUMTHR)
#define MAX_DISCARD_SEGMENT 256
#define AIO_MODE_THREAD_POOL 0
#define AIO_MODE_IO_URING 1
/* the max number of entries for the io_uring submission/completion queue */
#define MAX_IO_URING_ENTRIES 256
/*
* Debug printf
*/
@ -110,9 +118,25 @@ struct blockif_queue {
TAILQ_HEAD(, blockif_elem) busyq;
struct blockif_elem reqs[BLOCKIF_MAXREQ];
int in_flight;
struct io_uring ring;
struct iothread_mevent iomvt;
struct blockif_ctxt *bc;
};
struct blockif_ops {
int aio_mode;
int (*init)(struct blockif_queue *, char *);
void (*deinit)(struct blockif_queue *);
void (*mutex_lock)(pthread_mutex_t *);
void (*mutex_unlock)(pthread_mutex_t *);
void (*request)(struct blockif_queue *);
};
struct blockif_ctxt {
int fd;
int isblk;
@ -131,6 +155,9 @@ struct blockif_ctxt {
struct blockif_queue *bqs;
int bq_num;
int aio_mode;
const struct blockif_ops *ops;
/* write cache enable */
uint8_t wce;
};
@ -496,12 +523,296 @@ sub_file_unlock(struct blockif_ctxt *bc)
}
}
static int
thread_pool_init(struct blockif_queue *bq, char *tag)
{
int i;
char tname[MAXCOMLEN + 1];
for (i = 0; i < BLOCKIF_NUMTHR; i++) {
if (snprintf(tname, sizeof(tname), "%s-%d",
tag, i) >= sizeof(tname)) {
pr_err("blk thread name too long");
}
pthread_create(&bq->btid[i], NULL, blockif_thr, bq);
pthread_setname_np(bq->btid[i], tname);
}
return 0;
}
static void
thread_pool_deinit(struct blockif_queue *bq)
{
int i;
void *jval;
for (i = 0; i < BLOCKIF_NUMTHR; i++)
pthread_join(bq->btid[i], &jval);
}
static inline void
thread_pool_mutex_lock(pthread_mutex_t *mutex)
{
pthread_mutex_lock(mutex);
}
static inline void
thread_pool_mutex_unlock(pthread_mutex_t *mutex)
{
pthread_mutex_unlock(mutex);
}
static void
thread_pool_request(struct blockif_queue *bq)
{
pthread_cond_signal(&bq->cond);
}
static struct blockif_ops blockif_ops_thread_pool = {
.aio_mode = AIO_MODE_THREAD_POOL,
.init = thread_pool_init,
.deinit = thread_pool_deinit,
.mutex_lock = thread_pool_mutex_lock,
.mutex_unlock = thread_pool_mutex_unlock,
.request = thread_pool_request,
};
static bool
is_io_uring_supported_op(enum blockop op)
{
return ((op == BOP_READ) || (op == BOP_WRITE) || (op == BOP_FLUSH));
}
static int
iou_submit_sqe(struct blockif_queue *bq, struct blockif_elem *be)
{
int ret;
struct io_uring *ring = &bq->ring;
struct io_uring_sqe *sqes = io_uring_get_sqe(ring);
struct blockif_req *br = be->req;
struct blockif_ctxt *bc = bq->bc;
if (!sqes) {
pr_err("%s: io_uring_get_sqe fails. NO available submission queue entry. \n", __func__);
return -1;
}
switch (be->op) {
case BOP_READ:
io_uring_prep_readv(sqes, bc->fd, br->iov, br->iovcnt, br->offset + bc->sub_file_start_lba);
break;
case BOP_WRITE:
io_uring_prep_writev(sqes, bc->fd, br->iov, br->iovcnt, br->offset + bc->sub_file_start_lba);
break;
case BOP_FLUSH:
io_uring_prep_fsync(sqes, bc->fd, IORING_FSYNC_DATASYNC);
break;
default:
/* is_io_uring_supported_op guarantees that this case will not occur */
break;
}
io_uring_sqe_set_data(sqes, be);
bq->in_flight++;
ret = io_uring_submit(ring);
if (ret < 0) {
pr_err("%s: io_uring_submit fails, error %s \n", __func__, strerror(-ret));
}
return ret;
}
static void
iou_submit(struct blockif_queue *bq)
{
int err = 0;
struct blockif_elem *be;
struct blockif_req *br;
struct blockif_ctxt *bc = bq->bc;
while (blockif_dequeue(bq, 0, &be)) {
if (is_io_uring_supported_op(be->op)) {
err = iou_submit_sqe(bq, be);
/*
* -1 means that there is NO available submission queue entry (SQE) in the submission queue.
* Break the while loop here. Request can only be submitted when SQE is available.
*/
if (err == -1) {
break;
}
} else {
br = be->req;
if (be->op == BOP_DISCARD) {
err = blockif_process_discard(bc, br);
} else {
pr_err("%s: op %d is not supported \n", __func__, be->op);
err = EINVAL;
}
be->status = BST_DONE;
(*br->callback)(br, err);
blockif_complete(bq, be);
}
}
return;
}
static void
iou_process_completions(struct blockif_queue *bq)
{
struct io_uring_cqe *cqes = NULL;
struct blockif_elem *be;
struct blockif_req *br;
struct io_uring *ring = &bq->ring;
while (io_uring_peek_cqe(ring, &cqes) == 0) {
if (!cqes) {
pr_err("%s: cqes is NULL \n", __func__);
break;
}
be = io_uring_cqe_get_data(cqes);
bq->in_flight--;
io_uring_cqe_seen(ring, cqes);
cqes = NULL;
if (!be) {
pr_err("%s: be is NULL \n", __func__);
break;
}
br = be->req;
if (!br) {
pr_err("%s: br is NULL \n", __func__);
break;
}
be->status = BST_DONE;
(*br->callback)(br, 0);
blockif_complete(bq, be);
}
return;
}
static void
iou_submit_and_reap(struct blockif_queue *bq)
{
iou_submit(bq);
if (bq->in_flight > 0) {
iou_process_completions(bq);
}
return;
}
static void
iou_reap_and_submit(struct blockif_queue *bq)
{
iou_process_completions(bq);
if (!TAILQ_EMPTY(&bq->pendq)) {
iou_submit(bq);
}
return;
}
static void
iou_completion_cb(void *arg)
{
struct blockif_queue *bq = (struct blockif_queue *)arg;
iou_reap_and_submit(bq);
}
static int
iou_set_iothread(struct blockif_queue *bq)
{
int fd = bq->ring.ring_fd;
int ret = 0;
bq->iomvt.arg = bq;
bq->iomvt.run = iou_completion_cb;
bq->iomvt.fd = fd;
ret = iothread_add(fd, &bq->iomvt);
if (ret < 0) {
pr_err("%s: iothread_add fails, error %d \n", __func__, ret);
}
return ret;
}
static int
iou_del_iothread(struct blockif_queue *bq)
{
int fd = bq->ring.ring_fd;
int ret = 0;
ret = iothread_del(fd);
if (ret < 0) {
pr_err("%s: iothread_del fails, error %d \n", __func__, ret);
}
return ret;
}
static int
iou_init(struct blockif_queue *bq, char *tag __attribute__((unused)))
{
int ret = 0;
struct io_uring *ring = &bq->ring;
/*
* - When Service VM owns more dedicated cores, IORING_SETUP_SQPOLL and IORING_SETUP_IOPOLL, along with NVMe
* polling mechanism could benefit the performance.
* - When Service VM owns limited cores, the benefit of polling is also limited.
* As in most of the use cases, Service VM does not own much dedicated cores, IORING_SETUP_SQPOLL and
* IORING_SETUP_IOPOLL are not enabled by default.
*/
ret = io_uring_queue_init(MAX_IO_URING_ENTRIES, ring, 0);
if (ret < 0) {
pr_err("%s: io_uring_queue_init fails, error %d \n", __func__, ret);
} else {
ret = iou_set_iothread(bq);
if (ret < 0) {
pr_err("%s: iou_set_iothread fails \n", __func__);
}
}
return ret;
}
static void
iou_deinit(struct blockif_queue *bq)
{
struct io_uring *ring = &bq->ring;
iou_del_iothread(bq);
io_uring_queue_exit(ring);
}
static inline void iou_mutex_lock(pthread_mutex_t *mutex __attribute__((unused))) {}
static inline void iou_mutex_unlock(pthread_mutex_t *mutex __attribute__((unused))) {}
static struct blockif_ops blockif_ops_iou = {
.aio_mode = AIO_MODE_IO_URING,
.init = iou_init,
.deinit = iou_deinit,
.mutex_lock = iou_mutex_lock,
.mutex_unlock = iou_mutex_unlock,
.request = iou_submit_and_reap,
};
struct blockif_ctxt *
blockif_open(const char *optstr, const char *ident, int queue_num)
{
char tname[MAXCOMLEN + 1];
/* char name[MAXPATHLEN]; */
char tag[MAXCOMLEN + 1];
char *nopt, *xopts, *cp;
struct blockif_ctxt *bc = NULL;
struct stat sbuf;
@ -516,6 +827,7 @@ blockif_open(const char *optstr, const char *ident, int queue_num)
int sub_file_assign;
int max_discard_sectors, max_discard_seg, discard_sector_alignment;
off_t probe_arg[] = {0, 0};
int aio_mode;
pthread_once(&blockif_once, blockif_init);
@ -531,6 +843,9 @@ blockif_open(const char *optstr, const char *ident, int queue_num)
max_discard_seg = -1;
discard_sector_alignment = -1;
/* default mode is thread pool */
aio_mode = AIO_MODE_THREAD_POOL;
/* writethru is on by default */
writeback = 0;
@ -592,6 +907,19 @@ blockif_open(const char *optstr, const char *ident, int queue_num)
sub_file_assign = 1;
else
goto err;
} else if (!strncmp(cp, "aio", strlen("aio"))) {
/* aio=threads or aio=io_uring */
strsep(&cp, "=");
if (cp != NULL) {
if (!strncmp(cp, "threads", strlen("threads"))) {
aio_mode = AIO_MODE_THREAD_POOL;
} else if (!strncmp(cp, "io_uring", strlen("io_uring"))) {
aio_mode = AIO_MODE_IO_URING;
} else {
pr_err("Invalid aio option, only support threads or io_uring \"%s\"\n", cp);
goto err;
}
}
} else {
pr_err("Invalid device option \"%s\"\n", cp);
goto err;
@ -750,6 +1078,14 @@ blockif_open(const char *optstr, const char *ident, int queue_num)
bc->psectsz = psectsz;
bc->psectoff = psectoff;
bc->wce = writeback;
bc->aio_mode = aio_mode;
if (bc->aio_mode == AIO_MODE_IO_URING) {
bc->ops = &blockif_ops_iou;
} else {
bc->ops = &blockif_ops_thread_pool;
}
bc->bq_num = queue_num;
bc->bqs = calloc(bc->bq_num, sizeof(struct blockif_queue));
if (bc->bqs == NULL) {
@ -771,13 +1107,15 @@ blockif_open(const char *optstr, const char *ident, int queue_num)
TAILQ_INSERT_HEAD(&bq->freeq, &bq->reqs[i], link);
}
for (i = 0; i < BLOCKIF_NUMTHR; i++) {
if (snprintf(tname, sizeof(tname), "blk-%s-%d-%d",
ident, j, i) >= sizeof(tname)) {
pr_err("blk thread name too long");
if (snprintf(tag, sizeof(tag), "blk-%s-%d",
ident, j) >= sizeof(tag)) {
pr_err("blk thread tag too long");
}
if (bc->ops->init) {
if (bc->ops->init(bq, tag) < 0) {
goto err;
}
pthread_create(&bq->btid[i], NULL, blockif_thr, bq);
pthread_setname_np(bq->btid[i], tname);
}
}
@ -817,14 +1155,19 @@ blockif_request(struct blockif_ctxt *bc, struct blockif_req *breq,
}
bq = bc->bqs + breq->qidx;
pthread_mutex_lock(&bq->mtx);
if (bc->ops->mutex_lock) {
bc->ops->mutex_lock(&bq->mtx);
}
if (!TAILQ_EMPTY(&bq->freeq)) {
/*
* Enqueue and inform the block i/o thread
* that there is work available
*/
if (blockif_enqueue(bq, breq, op))
pthread_cond_signal(&bq->cond);
if (blockif_enqueue(bq, breq, op)) {
if (bc->ops->request) {
bc->ops->request(bq);
}
}
} else {
/*
* Callers are not allowed to enqueue more than
@ -834,8 +1177,9 @@ blockif_request(struct blockif_ctxt *bc, struct blockif_req *breq,
*/
err = E2BIG;
}
pthread_mutex_unlock(&bq->mtx);
if (bc->ops->mutex_unlock) {
bc->ops->mutex_unlock(&bq->mtx);
}
return err;
}
@ -948,8 +1292,7 @@ blockif_cancel(struct blockif_ctxt *bc, struct blockif_req *breq)
int
blockif_close(struct blockif_ctxt *bc)
{
void *jval;
int i, j;
int j;
sub_file_unlock(bc);
@ -964,10 +1307,10 @@ blockif_close(struct blockif_ctxt *bc)
pthread_cond_broadcast(&bq->cond);
pthread_mutex_unlock(&bq->mtx);
for (i = 0; i < BLOCKIF_NUMTHR; i++)
pthread_join(bq->btid[i], &jval);
if (bc->ops->deinit) {
bc->ops->deinit(bq);
}
}
/* XXX Cancel queued i/o's ??? */
/*

View File

@ -66,6 +66,13 @@ void iothread_handler(void *arg)
struct virtio_base *base = viothrd->base;
int idx = viothrd->idx;
struct virtio_vq_info *vq = &base->queues[idx];
eventfd_t val;
/* Mitigate the epoll_wait repeat cycles by reading out the event */
if (eventfd_read(vq->viothrd.iomvt.fd, &val) == -1) {
pr_err("%s: eventfd_read fails \r\n", __func__);
return;
}
if (viothrd->iothread_run) {
pthread_mutex_lock(&vq->mtx);