dm: block_if: add multiple queues support

block_if is the backend of ahci and virtio-blk. Only one queue is
supported by block_if now. Several worker threads are created as
the thread pool for the queue. One BIG mutex is used for the queue
and thread operation. With this patch block_if can support multiple
queues and each queue is backed by several worker threads. blockif_req
can be submited/enqueued into one specified queue. By spliting into
several queues contention from the BIG mutex can be relieved/eliminated.
This is used to support virtio-blk multiple queues feature.

Tracked-On: #8612

Signed-off-by: Jian Jun Chen <jian.jun.chen@intel.com>
Acked-by: Wang, Yu1 <yu1.wang@intel.com>
This commit is contained in:
Jian Jun Chen 2023-07-11 17:47:23 +08:00 committed by acrnsi-robot
parent 562c22fb4e
commit edb392e7ed
4 changed files with 127 additions and 78 deletions

View File

@ -97,6 +97,22 @@ struct blockif_elem {
off_t block;
};
struct blockif_queue {
int closing;
pthread_t btid[BLOCKIF_NUMTHR];
pthread_mutex_t mtx;
pthread_cond_t cond;
/* Request elements and free/pending/busy queues */
TAILQ_HEAD(, blockif_elem) freeq;
TAILQ_HEAD(, blockif_elem) pendq;
TAILQ_HEAD(, blockif_elem) busyq;
struct blockif_elem reqs[BLOCKIF_MAXREQ];
struct blockif_ctxt *bc;
};
struct blockif_ctxt {
int fd;
int isblk;
@ -112,16 +128,8 @@ struct blockif_ctxt {
int max_discard_sectors;
int max_discard_seg;
int discard_sector_alignment;
int closing;
pthread_t btid[BLOCKIF_NUMTHR];
pthread_mutex_t mtx;
pthread_cond_t cond;
/* Request elements and free/pending/busy queues */
TAILQ_HEAD(, blockif_elem) freeq;
TAILQ_HEAD(, blockif_elem) pendq;
TAILQ_HEAD(, blockif_elem) busyq;
struct blockif_elem reqs[BLOCKIF_MAXREQ];
struct blockif_queue *bqs;
int bq_num;
/* write cache enable */
uint8_t wce;
@ -158,19 +166,19 @@ blockif_flush_cache(struct blockif_ctxt *bc)
}
static int
blockif_enqueue(struct blockif_ctxt *bc, struct blockif_req *breq,
blockif_enqueue(struct blockif_queue *bq, struct blockif_req *breq,
enum blockop op)
{
struct blockif_elem *be, *tbe;
off_t off;
int i;
be = TAILQ_FIRST(&bc->freeq);
be = TAILQ_FIRST(&bq->freeq);
if (be == NULL || be->status != BST_FREE) {
WPRINTF(("%s: failed to get element from freeq\n", __func__));
return 0;
}
TAILQ_REMOVE(&bc->freeq, be, link);
TAILQ_REMOVE(&bq->freeq, be, link);
be->req = breq;
be->op = op;
switch (op) {
@ -186,12 +194,12 @@ blockif_enqueue(struct blockif_ctxt *bc, struct blockif_req *breq,
off = 1 << (sizeof(off_t) - 1);
}
be->block = off;
TAILQ_FOREACH(tbe, &bc->pendq, link) {
TAILQ_FOREACH(tbe, &bq->pendq, link) {
if (tbe->block == breq->offset)
break;
}
if (tbe == NULL) {
TAILQ_FOREACH(tbe, &bc->busyq, link) {
TAILQ_FOREACH(tbe, &bq->busyq, link) {
if (tbe->block == breq->offset)
break;
}
@ -200,46 +208,46 @@ blockif_enqueue(struct blockif_ctxt *bc, struct blockif_req *breq,
be->status = BST_PEND;
else
be->status = BST_BLOCK;
TAILQ_INSERT_TAIL(&bc->pendq, be, link);
TAILQ_INSERT_TAIL(&bq->pendq, be, link);
return (be->status == BST_PEND);
}
static int
blockif_dequeue(struct blockif_ctxt *bc, pthread_t t, struct blockif_elem **bep)
blockif_dequeue(struct blockif_queue *bq, pthread_t t, struct blockif_elem **bep)
{
struct blockif_elem *be;
TAILQ_FOREACH(be, &bc->pendq, link) {
TAILQ_FOREACH(be, &bq->pendq, link) {
if (be->status == BST_PEND)
break;
}
if (be == NULL)
return 0;
TAILQ_REMOVE(&bc->pendq, be, link);
TAILQ_REMOVE(&bq->pendq, be, link);
be->status = BST_BUSY;
be->tid = t;
TAILQ_INSERT_TAIL(&bc->busyq, be, link);
TAILQ_INSERT_TAIL(&bq->busyq, be, link);
*bep = be;
return 1;
}
static void
blockif_complete(struct blockif_ctxt *bc, struct blockif_elem *be)
blockif_complete(struct blockif_queue *bq, struct blockif_elem *be)
{
struct blockif_elem *tbe;
if (be->status == BST_DONE || be->status == BST_BUSY)
TAILQ_REMOVE(&bc->busyq, be, link);
TAILQ_REMOVE(&bq->busyq, be, link);
else
TAILQ_REMOVE(&bc->pendq, be, link);
TAILQ_FOREACH(tbe, &bc->pendq, link) {
TAILQ_REMOVE(&bq->pendq, be, link);
TAILQ_FOREACH(tbe, &bq->pendq, link) {
if (tbe->req->offset == be->block)
tbe->status = BST_PEND;
}
be->tid = 0;
be->status = BST_FREE;
be->req = NULL;
TAILQ_INSERT_TAIL(&bc->freeq, be, link);
TAILQ_INSERT_TAIL(&bq->freeq, be, link);
}
static int
@ -327,13 +335,15 @@ blockif_process_discard(struct blockif_ctxt *bc, struct blockif_req *br)
}
static void
blockif_proc(struct blockif_ctxt *bc, struct blockif_elem *be)
blockif_proc(struct blockif_queue *bq, struct blockif_elem *be)
{
struct blockif_req *br;
struct blockif_ctxt *bc;
ssize_t len;
int err;
br = be->req;
bc = bq->bc;
err = 0;
switch (be->op) {
case BOP_READ:
@ -379,29 +389,29 @@ blockif_proc(struct blockif_ctxt *bc, struct blockif_elem *be)
static void *
blockif_thr(void *arg)
{
struct blockif_ctxt *bc;
struct blockif_queue *bq;
struct blockif_elem *be;
pthread_t t;
bc = arg;
bq = arg;
t = pthread_self();
pthread_mutex_lock(&bc->mtx);
pthread_mutex_lock(&bq->mtx);
for (;;) {
while (blockif_dequeue(bc, t, &be)) {
pthread_mutex_unlock(&bc->mtx);
blockif_proc(bc, be);
pthread_mutex_lock(&bc->mtx);
blockif_complete(bc, be);
while (blockif_dequeue(bq, t, &be)) {
pthread_mutex_unlock(&bq->mtx);
blockif_proc(bq, be);
pthread_mutex_lock(&bq->mtx);
blockif_complete(bq, be);
}
/* Check ctxt status here to see if exit requested */
if (bc->closing)
if (bq->closing)
break;
pthread_cond_wait(&bc->cond, &bc->mtx);
pthread_cond_wait(&bq->cond, &bq->mtx);
}
pthread_mutex_unlock(&bc->mtx);
pthread_mutex_unlock(&bq->mtx);
pthread_exit(NULL);
return NULL;
}
@ -488,16 +498,16 @@ sub_file_unlock(struct blockif_ctxt *bc)
struct blockif_ctxt *
blockif_open(const char *optstr, const char *ident)
blockif_open(const char *optstr, const char *ident, int queue_num)
{
char tname[MAXCOMLEN + 1];
/* char name[MAXPATHLEN]; */
char *nopt, *xopts, *cp;
struct blockif_ctxt *bc;
struct blockif_ctxt *bc = NULL;
struct stat sbuf;
/* struct diocgattr_arg arg; */
off_t size, psectsz, psectoff;
int fd, i, sectsz;
int fd, i, j, sectsz;
int writeback, ro, candiscard, ssopt, pssopt;
long sz;
long long b;
@ -526,6 +536,9 @@ blockif_open(const char *optstr, const char *ident)
candiscard = 0;
if (queue_num <= 0)
queue_num = 1;
/*
* The first element in the optstring is always a pathname.
* Optional elements follow
@ -737,23 +750,35 @@ blockif_open(const char *optstr, const char *ident)
bc->psectsz = psectsz;
bc->psectoff = psectoff;
bc->wce = writeback;
pthread_mutex_init(&bc->mtx, NULL);
pthread_cond_init(&bc->cond, NULL);
TAILQ_INIT(&bc->freeq);
TAILQ_INIT(&bc->pendq);
TAILQ_INIT(&bc->busyq);
for (i = 0; i < BLOCKIF_MAXREQ; i++) {
bc->reqs[i].status = BST_FREE;
TAILQ_INSERT_HEAD(&bc->freeq, &bc->reqs[i], link);
bc->bq_num = queue_num;
bc->bqs = calloc(bc->bq_num, sizeof(struct blockif_queue));
if (bc->bqs == NULL) {
pr_err("calloc bqs");
goto err;
}
for (i = 0; i < BLOCKIF_NUMTHR; i++) {
if (snprintf(tname, sizeof(tname), "blk-%s-%d",
ident, i) >= sizeof(tname)) {
pr_err("blk thread name too long");
for (j = 0; j < bc->bq_num; j++) {
struct blockif_queue *bq = bc->bqs + j;
bq->bc = bc;
pthread_mutex_init(&bq->mtx, NULL);
pthread_cond_init(&bq->cond, NULL);
TAILQ_INIT(&bq->freeq);
TAILQ_INIT(&bq->pendq);
TAILQ_INIT(&bq->busyq);
for (i = 0; i < BLOCKIF_MAXREQ; i++) {
bq->reqs[i].status = BST_FREE;
TAILQ_INSERT_HEAD(&bq->freeq, &bq->reqs[i], link);
}
for (i = 0; i < BLOCKIF_NUMTHR; i++) {
if (snprintf(tname, sizeof(tname), "blk-%s-%d-%d",
ident, j, i) >= sizeof(tname)) {
pr_err("blk thread name too long");
}
pthread_create(&bq->btid[i], NULL, blockif_thr, bq);
pthread_setname_np(bq->btid[i], tname);
}
pthread_create(&bc->btid[i], NULL, blockif_thr, bc);
pthread_setname_np(bc->btid[i], tname);
}
/* free strdup memory */
@ -767,9 +792,13 @@ err:
/* handle failure case: free strdup memory*/
if (nopt)
free(nopt);
if (fd >= 0)
close(fd);
if (bc) {
if (bc->bqs)
free(bc->bqs);
free(bc);
}
return NULL;
}
@ -777,18 +806,25 @@ static int
blockif_request(struct blockif_ctxt *bc, struct blockif_req *breq,
enum blockop op)
{
struct blockif_queue *bq;
int err;
err = 0;
pthread_mutex_lock(&bc->mtx);
if (!TAILQ_EMPTY(&bc->freeq)) {
if (breq->qidx >= bc->bq_num) {
pr_err("%s: invalid qidx %d\n", __func__, breq->qidx);
return ENOENT;
}
bq = bc->bqs + breq->qidx;
pthread_mutex_lock(&bq->mtx);
if (!TAILQ_EMPTY(&bq->freeq)) {
/*
* Enqueue and inform the block i/o thread
* that there is work available
*/
if (blockif_enqueue(bc, breq, op))
pthread_cond_signal(&bc->cond);
if (blockif_enqueue(bq, breq, op))
pthread_cond_signal(&bq->cond);
} else {
/*
* Callers are not allowed to enqueue more than
@ -798,7 +834,7 @@ blockif_request(struct blockif_ctxt *bc, struct blockif_req *breq,
*/
err = E2BIG;
}
pthread_mutex_unlock(&bc->mtx);
pthread_mutex_unlock(&bq->mtx);
return err;
}
@ -831,12 +867,19 @@ int
blockif_cancel(struct blockif_ctxt *bc, struct blockif_req *breq)
{
struct blockif_elem *be;
struct blockif_queue *bq;
pthread_mutex_lock(&bc->mtx);
if (breq->qidx >= bc->bq_num) {
pr_err("%s: invalid qidx %d\n", __func__, breq->qidx);
return ENOENT;
}
bq = bc->bqs + breq->qidx;
pthread_mutex_lock(&bq->mtx);
/*
* Check pending requests.
*/
TAILQ_FOREACH(be, &bc->pendq, link) {
TAILQ_FOREACH(be, &bq->pendq, link) {
if (be->req == breq)
break;
}
@ -844,8 +887,8 @@ blockif_cancel(struct blockif_ctxt *bc, struct blockif_req *breq)
/*
* Found it.
*/
blockif_complete(bc, be);
pthread_mutex_unlock(&bc->mtx);
blockif_complete(bq, be);
pthread_mutex_unlock(&bq->mtx);
return 0;
}
@ -853,7 +896,7 @@ blockif_cancel(struct blockif_ctxt *bc, struct blockif_req *breq)
/*
* Check in-flight requests.
*/
TAILQ_FOREACH(be, &bc->busyq, link) {
TAILQ_FOREACH(be, &bq->busyq, link) {
if (be->req == breq)
break;
}
@ -861,7 +904,7 @@ blockif_cancel(struct blockif_ctxt *bc, struct blockif_req *breq)
/*
* Didn't find it.
*/
pthread_mutex_unlock(&bc->mtx);
pthread_mutex_unlock(&bq->mtx);
return -1;
}
@ -893,7 +936,7 @@ blockif_cancel(struct blockif_ctxt *bc, struct blockif_req *breq)
pthread_mutex_unlock(&bse.mtx);
}
pthread_mutex_unlock(&bc->mtx);
pthread_mutex_unlock(&bq->mtx);
/*
* The processing thread has been interrupted. Since it's not
@ -906,20 +949,24 @@ int
blockif_close(struct blockif_ctxt *bc)
{
void *jval;
int i;
int i, j;
sub_file_unlock(bc);
/*
* Stop the block i/o thread
*/
pthread_mutex_lock(&bc->mtx);
bc->closing = 1;
pthread_cond_broadcast(&bc->cond);
pthread_mutex_unlock(&bc->mtx);
for (j = 0; j < bc->bq_num; j++) {
struct blockif_queue *bq = bc->bqs + j;
for (i = 0; i < BLOCKIF_NUMTHR; i++)
pthread_join(bc->btid[i], &jval);
pthread_mutex_lock(&bq->mtx);
bq->closing = 1;
pthread_cond_broadcast(&bq->cond);
pthread_mutex_unlock(&bq->mtx);
for (i = 0; i < BLOCKIF_NUMTHR; i++)
pthread_join(bq->btid[i], &jval);
}
/* XXX Cancel queued i/o's ??? */
@ -927,6 +974,8 @@ blockif_close(struct blockif_ctxt *bc)
* Release resources
*/
close(bc->fd);
if (bc->bqs)
free(bc->bqs);
free(bc);
return 0;

View File

@ -2405,7 +2405,7 @@ pci_ahci_init(struct vmctx *ctx, struct pci_vdev *dev, char *opts, int atapi)
*/
snprintf(bident, sizeof(bident), "%02x:%02x:%02x", dev->slot,
dev->func, p);
bctxt = blockif_open(opts, bident);
bctxt = blockif_open(opts, bident, 1);
if (bctxt == NULL) {
ahci_dev->ports = p;
ret = 1;

View File

@ -545,7 +545,7 @@ virtio_blk_init(struct vmctx *ctx, struct pci_vdev *dev, char *opts)
break;
}
}
bctxt = blockif_open(p, bident);
bctxt = blockif_open(p, bident, num_vqs);
if (bctxt == NULL) {
pr_err("Could not open backing file");
free(opts_start);
@ -804,7 +804,7 @@ virtio_blk_rescan(struct vmctx *ctx, struct pci_vdev *dev, char *newpath)
pr_err("name=%s, Path=%s, ident=%s\n", dev->name, newpath, bident);
/* update the bctxt for the virtio-blk device */
bctxt = blockif_open(newpath, bident);
bctxt = blockif_open(newpath, bident, blk->num_vqs);
if (bctxt == NULL) {
pr_err("Error opening backing file\n");
goto end;

View File

@ -52,7 +52,7 @@ struct blockif_req {
};
struct blockif_ctxt;
struct blockif_ctxt *blockif_open(const char *optstr, const char *ident);
struct blockif_ctxt *blockif_open(const char *optstr, const char *ident, int queue_num);
off_t blockif_size(struct blockif_ctxt *bc);
void blockif_chs(struct blockif_ctxt *bc, uint16_t *c, uint8_t *h,
uint8_t *s);