mirror of
https://github.com/kata-containers/kata-containers.git
synced 2026-07-01 14:38:33 +00:00
Merge pull request #12959 from DataDog/mayeul/fix-race-condition-when-adding-qdisc
shim: Add backoff retry to ingress qdisc creation to avoid potential race condition
This commit is contained in:
@@ -4,6 +4,8 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
use std::time::Duration;
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use async_trait::async_trait;
|
||||
use rtnetlink::Handle;
|
||||
@@ -12,6 +14,9 @@ use scopeguard::defer;
|
||||
use super::{NetworkModel, NetworkModelType};
|
||||
use crate::network::NetworkPair;
|
||||
|
||||
const QDISC_ADD_ATTEMPTS: u64 = 5; // Number of attempts when adding an ingress qdisc
|
||||
const QDISC_ADD_BACKOFF_MS: u64 = 10; // Base delay for the linear backoff between qdisc add retries on EBUSY
|
||||
|
||||
#[derive(Debug)]
|
||||
pub(crate) struct TcFilterModel {}
|
||||
|
||||
@@ -42,19 +47,11 @@ impl NetworkModel for TcFilterModel {
|
||||
.await
|
||||
.context("fetch virt by index")?;
|
||||
|
||||
handle
|
||||
.qdisc()
|
||||
.add(tap_index as i32)
|
||||
.ingress()
|
||||
.execute()
|
||||
add_ingress_qdisc(&handle, tap_index as i32)
|
||||
.await
|
||||
.context("add tap ingress")?;
|
||||
|
||||
handle
|
||||
.qdisc()
|
||||
.add(virt_index as i32)
|
||||
.ingress()
|
||||
.execute()
|
||||
add_ingress_qdisc(&handle, virt_index as i32)
|
||||
.await
|
||||
.context("add virt ingress")?;
|
||||
|
||||
@@ -95,6 +92,29 @@ impl NetworkModel for TcFilterModel {
|
||||
}
|
||||
}
|
||||
|
||||
/// Add an ingress qdisc to the device at the given index, retrying on EBUSY
|
||||
/// with linear backoff (10ms, 20ms, ...).
|
||||
async fn add_ingress_qdisc(handle: &Handle, index: i32) -> Result<(), rtnetlink::Error> {
|
||||
let mut last_err = handle.qdisc().add(index).ingress().execute().await;
|
||||
for i in 1..QDISC_ADD_ATTEMPTS {
|
||||
match &last_err {
|
||||
Ok(()) => return Ok(()),
|
||||
Err(e) if !is_ebusy(e) => break,
|
||||
Err(_) => {}
|
||||
}
|
||||
tokio::time::sleep(Duration::from_millis(QDISC_ADD_BACKOFF_MS * i)).await;
|
||||
last_err = handle.qdisc().add(index).ingress().execute().await;
|
||||
}
|
||||
last_err
|
||||
}
|
||||
|
||||
fn is_ebusy(err: &rtnetlink::Error) -> bool {
|
||||
match err {
|
||||
rtnetlink::Error::NetlinkError(msg) => msg.code.is_some_and(|c| c.get() == -libc::EBUSY),
|
||||
_ => false,
|
||||
}
|
||||
}
|
||||
|
||||
pub async fn fetch_index(handle: &Handle, name: &str) -> Result<u32> {
|
||||
let link = crate::network::network_pair::get_link_by_name(handle, name)
|
||||
.await
|
||||
|
||||
@@ -8,6 +8,7 @@ package virtcontainers
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
@@ -18,6 +19,7 @@ import (
|
||||
"sort"
|
||||
"strconv"
|
||||
"strings"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
"github.com/containernetworking/plugins/pkg/ns"
|
||||
@@ -38,6 +40,8 @@ import (
|
||||
const (
|
||||
defaultFilePerms = 0600
|
||||
defaultQlen = 1500
|
||||
qdiscAddAttempts = 5 // Number of attempts when adding an ingress qdisc
|
||||
qdiscAddBackoff = 10 * time.Millisecond // Base delay for the linear backoff between qdisc add retries on EBUSY
|
||||
)
|
||||
|
||||
// LinuxNetwork represents a sandbox networking setup.
|
||||
@@ -1163,12 +1167,20 @@ func addQdiscIngress(index int) error {
|
||||
},
|
||||
}
|
||||
|
||||
err := netlink.QdiscAdd(qdisc)
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to add qdisc for network index %d : %s", index, err)
|
||||
var err error
|
||||
for i := 0; i < qdiscAddAttempts; i++ {
|
||||
err = netlink.QdiscAdd(qdisc)
|
||||
if err == nil {
|
||||
return nil
|
||||
}
|
||||
if !errors.Is(err, syscall.EBUSY) {
|
||||
break // non-retryable error
|
||||
}
|
||||
if i < qdiscAddAttempts-1 {
|
||||
time.Sleep(time.Duration(i+1) * qdiscAddBackoff)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
return fmt.Errorf("Failed to add qdisc for network index %d : %w", index, err)
|
||||
}
|
||||
|
||||
// addRedirectTCFilter adds a tc filter for device with index "sourceIndex".
|
||||
|
||||
Reference in New Issue
Block a user