mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-28 16:27:50 +00:00
runtime-rs: timeout for shim management client
Let client side support timeout if the timeout value is set. If timeout not set, execute directly. Fixes: #5114 Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>
This commit is contained in:
parent
9f13496e13
commit
5add50aea2
@ -12,5 +12,5 @@ logging::logger_with_subsystem!(sl, "runtimes");
|
|||||||
pub mod manager;
|
pub mod manager;
|
||||||
pub use manager::RuntimeHandlerManager;
|
pub use manager::RuntimeHandlerManager;
|
||||||
mod shim_mgmt;
|
mod shim_mgmt;
|
||||||
pub use shim_mgmt::client::MgmtClient;
|
pub use shim_mgmt::{client::MgmtClient, server::sb_storage_path};
|
||||||
mod static_resource;
|
mod static_resource;
|
||||||
|
@ -8,10 +8,7 @@ use std::sync::Arc;
|
|||||||
|
|
||||||
use anyhow::{anyhow, Context, Result};
|
use anyhow::{anyhow, Context, Result};
|
||||||
|
|
||||||
use crate::{
|
use crate::{shim_mgmt::server::MgmtServer, static_resource::StaticResourceManager};
|
||||||
shim_mgmt::server::MgmtServer,
|
|
||||||
static_resource::StaticResourceManager,
|
|
||||||
};
|
|
||||||
use common::{
|
use common::{
|
||||||
message::Message,
|
message::Message,
|
||||||
types::{Request, Response},
|
types::{Request, Response},
|
||||||
|
@ -4,8 +4,6 @@
|
|||||||
// SPDX-License-Identifier: Apache-2.0
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
//
|
//
|
||||||
|
|
||||||
#![allow(dead_code)]
|
|
||||||
|
|
||||||
// Defines the general client functions used by other components acting like
|
// Defines the general client functions used by other components acting like
|
||||||
// clients. To be specific, a client first connect to the socket, then send
|
// clients. To be specific, a client first connect to the socket, then send
|
||||||
// request to destined URL, and finally handle the request(or not)
|
// request to destined URL, and finally handle the request(or not)
|
||||||
@ -13,19 +11,20 @@
|
|||||||
use std::{path::Path, path::PathBuf, time::Duration};
|
use std::{path::Path, path::PathBuf, time::Duration};
|
||||||
|
|
||||||
use super::server::mgmt_socket_addr;
|
use super::server::mgmt_socket_addr;
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{anyhow, Context, Result};
|
||||||
use hyper::{Body, Client, Method, Request, Response};
|
use hyper::{Body, Client, Response};
|
||||||
use hyperlocal::{UnixClientExt, UnixConnector, Uri};
|
use hyperlocal::{UnixClientExt, UnixConnector, Uri};
|
||||||
|
|
||||||
/// Shim management client with timeout
|
/// Shim management client with timeout
|
||||||
pub struct MgmtClient {
|
pub struct MgmtClient {
|
||||||
/// The socket *file path* on host file system
|
/// The socket *file path* on host file system
|
||||||
s_path: PathBuf,
|
sock_path: PathBuf,
|
||||||
|
|
||||||
/// The http client connect to the long standing shim mgmt server
|
/// The http client connect to the long standing shim mgmt server
|
||||||
client: Client<UnixConnector, Body>,
|
client: Client<UnixConnector, Body>,
|
||||||
|
|
||||||
/// timeout value for each dial
|
/// Timeout value for each dial, usually 200ms will be enough
|
||||||
|
/// For heavier workload, you may want longer timeout
|
||||||
timeout: Option<Duration>,
|
timeout: Option<Duration>,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -36,10 +35,10 @@ impl MgmtClient {
|
|||||||
let s_addr = unix_socket_path
|
let s_addr = unix_socket_path
|
||||||
.strip_prefix("unix:")
|
.strip_prefix("unix:")
|
||||||
.context("failed to strix prefix")?;
|
.context("failed to strix prefix")?;
|
||||||
let s_path = Path::new("/").join(s_addr).as_path().to_owned();
|
let sock_path = Path::new("/").join(s_addr).as_path().to_owned();
|
||||||
let client = Client::unix();
|
let client = Client::unix();
|
||||||
Ok(Self {
|
Ok(Self {
|
||||||
s_path,
|
sock_path,
|
||||||
client,
|
client,
|
||||||
timeout,
|
timeout,
|
||||||
})
|
})
|
||||||
@ -48,32 +47,15 @@ impl MgmtClient {
|
|||||||
/// The http GET method for client, return a raw response. Further handling should be done by caller.
|
/// The http GET method for client, return a raw response. Further handling should be done by caller.
|
||||||
/// Parameter uri should be like "/agent-url" etc.
|
/// Parameter uri should be like "/agent-url" etc.
|
||||||
pub async fn get(&self, uri: &str) -> Result<Response<Body>> {
|
pub async fn get(&self, uri: &str) -> Result<Response<Body>> {
|
||||||
let url: hyper::Uri = Uri::new(&self.s_path, uri).into();
|
let url: hyper::Uri = Uri::new(&self.sock_path, uri).into();
|
||||||
let response = self.client.get(url).await.context("failed to GET")?;
|
let work = self.client.get(url);
|
||||||
Ok(response)
|
match self.timeout {
|
||||||
}
|
Some(timeout) => match tokio::time::timeout(timeout, work).await {
|
||||||
|
Ok(result) => result.map_err(|e| anyhow!(e)),
|
||||||
/// The http PUT method for client
|
Err(_) => Err(anyhow!("TIMEOUT")),
|
||||||
pub async fn put(&self, uri: &str) -> Result<Response<Body>> {
|
},
|
||||||
let url: hyper::Uri = Uri::new(&self.s_path, uri).into();
|
// if timeout not set, work executes directly
|
||||||
let req = Request::builder()
|
None => work.await.context("failed to GET"),
|
||||||
.method(Method::PUT)
|
}
|
||||||
.uri(url)
|
|
||||||
.body(Body::from(""))
|
|
||||||
.expect("request builder");
|
|
||||||
let response = self.client.request(req).await?;
|
|
||||||
Ok(response)
|
|
||||||
}
|
|
||||||
|
|
||||||
/// The http POST method for client
|
|
||||||
pub async fn post(&self, uri: &str) -> Result<Response<Body>> {
|
|
||||||
let url: hyper::Uri = Uri::new(&self.s_path, uri).into();
|
|
||||||
let req = Request::builder()
|
|
||||||
.method(Method::POST)
|
|
||||||
.uri(url)
|
|
||||||
.body(Body::from(""))
|
|
||||||
.expect("request builder");
|
|
||||||
let response = self.client.request(req).await?;
|
|
||||||
Ok(response)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,6 @@
|
|||||||
// SPDX-License-Identifier: Apache-2.0
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
//
|
//
|
||||||
|
|
||||||
|
pub mod client;
|
||||||
mod handlers;
|
mod handlers;
|
||||||
pub mod server;
|
pub mod server;
|
||||||
pub mod client;
|
|
||||||
|
@ -21,7 +21,7 @@ use tokio::net::UnixListener;
|
|||||||
use super::handlers::handler_mux;
|
use super::handlers::handler_mux;
|
||||||
|
|
||||||
pub(crate) const DIRECT_VOLUMN_PATH_KEY: &str = "path";
|
pub(crate) const DIRECT_VOLUMN_PATH_KEY: &str = "path";
|
||||||
pub(crate) const DIRECT_VOLUMN_STAT_URL: &str = "/direct-volumn/stats";
|
pub(crate) const DIRECT_VOLUMN_STATS_URL: &str = "/direct-volumn/stats";
|
||||||
pub(crate) const DIRECT_VOLUMN_RESIZE_URL: &str = "/direct-volumn/resize";
|
pub(crate) const DIRECT_VOLUMN_RESIZE_URL: &str = "/direct-volumn/resize";
|
||||||
pub(crate) const AGENT_URL: &str = "/agent-url";
|
pub(crate) const AGENT_URL: &str = "/agent-url";
|
||||||
pub(crate) const IP_TABLE_URL: &str = "/iptables";
|
pub(crate) const IP_TABLE_URL: &str = "/iptables";
|
||||||
@ -53,10 +53,10 @@ impl MgmtServer {
|
|||||||
// TODO(when metrics is supported): register sandbox metrics
|
// TODO(when metrics is supported): register sandbox metrics
|
||||||
// running management http server in an infinite loop, able to serve concurrent requests
|
// running management http server in an infinite loop, able to serve concurrent requests
|
||||||
pub async fn run(self: Arc<Self>) {
|
pub async fn run(self: Arc<Self>) {
|
||||||
let lsnr = lsnr_from_path(self.s_addr.clone()).await.unwrap();
|
let listener = listener_from_path(self.s_addr.clone()).await.unwrap();
|
||||||
// start an infinate loop, which serves the incomming uds stream
|
// start an infinite loop, which serves the incomming uds stream
|
||||||
loop {
|
loop {
|
||||||
let (stream, _) = lsnr.accept().await.unwrap();
|
let (stream, _) = listener.accept().await.unwrap();
|
||||||
let me = self.clone();
|
let me = self.clone();
|
||||||
// spawn a light weight thread to multiplex to the handler
|
// spawn a light weight thread to multiplex to the handler
|
||||||
tokio::task::spawn(async move {
|
tokio::task::spawn(async move {
|
||||||
@ -91,7 +91,7 @@ pub fn mgmt_socket_addr(sid: String) -> String {
|
|||||||
|
|
||||||
// from path, return a unix listener corresponding to that path,
|
// from path, return a unix listener corresponding to that path,
|
||||||
// if the path(socket file) is not created, we create that here
|
// if the path(socket file) is not created, we create that here
|
||||||
async fn lsnr_from_path(path: String) -> Result<UnixListener> {
|
async fn listener_from_path(path: String) -> Result<UnixListener> {
|
||||||
// create the socket if not present
|
// create the socket if not present
|
||||||
let trim_path = path.strip_prefix("unix:").context("trim path")?;
|
let trim_path = path.strip_prefix("unix:").context("trim path")?;
|
||||||
let file_path = Path::new("/").join(trim_path);
|
let file_path = Path::new("/").join(trim_path);
|
||||||
@ -103,3 +103,15 @@ async fn lsnr_from_path(path: String) -> Result<UnixListener> {
|
|||||||
info!(sl!(), "mgmt-svr: binding to path {}", path);
|
info!(sl!(), "mgmt-svr: binding to path {}", path);
|
||||||
UnixListener::bind(file_path).context("bind address")
|
UnixListener::bind(file_path).context("bind address")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
#[cfg(test)]
|
||||||
|
mod tests {
|
||||||
|
use super::*;
|
||||||
|
|
||||||
|
#[test]
|
||||||
|
fn mgmt_svr_test_sock_addr() {
|
||||||
|
let sid = String::from("414123");
|
||||||
|
let addr = mgmt_socket_addr(sid);
|
||||||
|
assert_eq!(addr, "unix:///run/kata/414123/shim-monitor.sock");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user