Merge pull request #5133 from openanolis/shimmgmt

feat(Shimmgmt): Shim management server and client
This commit is contained in:
Zhongtao Hu 2022-09-20 14:37:19 +08:00 committed by GitHub
commit fc65e96ad5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
13 changed files with 365 additions and 1 deletions

View File

@ -1169,12 +1169,62 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "http"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "75f43d41e26995c17e71ee126451dd3941010b0514a81a9d11f3b341debc2399"
dependencies = [
"bytes 1.1.0",
"fnv",
"itoa",
]
[[package]]
name = "http-body"
version = "0.4.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d5f38f16d184e36f2408a55281cd658ecbd3ca05cce6d6510a176eca393e26d1"
dependencies = [
"bytes 1.1.0",
"http",
"pin-project-lite",
]
[[package]]
name = "httparse"
version = "1.8.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d897f394bad6a705d5f4104762e116a75639e470d80901eed05a860a95cb1904"
[[package]] [[package]]
name = "httpdate" name = "httpdate"
version = "1.0.2" version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421" checksum = "c4a1e36c821dbe04574f602848a19f742f4fb3c98d40449f11bcad18d6b17421"
[[package]]
name = "hyper"
version = "0.14.20"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "02c929dc5c39e335a03c405292728118860721b10190d98c2a0f0efd5baafbac"
dependencies = [
"bytes 1.1.0",
"futures-channel",
"futures-core",
"futures-util",
"http",
"http-body",
"httparse",
"httpdate",
"itoa",
"pin-project-lite",
"tokio",
"tower-service",
"tracing",
"want",
]
[[package]] [[package]]
name = "hypervisor" name = "hypervisor"
version = "0.1.0" version = "0.1.0"
@ -2217,6 +2267,7 @@ version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"common", "common",
"hyper",
"kata-types", "kata-types",
"lazy_static", "lazy_static",
"linux_container", "linux_container",
@ -2770,6 +2821,38 @@ dependencies = [
"serde", "serde",
] ]
[[package]]
name = "tower-service"
version = "0.3.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "b6bc1c9ce2b5135ac7f93c72918fc37feb872bdc6a5533a8b85eb4b86bfdae52"
[[package]]
name = "tracing"
version = "0.1.35"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a400e31aa60b9d44a52a8ee0343b5b18566b03a8321e0d321f695cf56e940160"
dependencies = [
"cfg-if 1.0.0",
"pin-project-lite",
"tracing-core",
]
[[package]]
name = "tracing-core"
version = "0.1.28"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7b7358be39f2f274f322d2aaed611acc57f382e8eb1e5b48cb9ae30933495ce7"
dependencies = [
"once_cell",
]
[[package]]
name = "try-lock"
version = "0.2.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "59547bce71d9c38b83d9c0e92b6066c4253371f15005def0c30d9657f50c7642"
[[package]] [[package]]
name = "ttrpc" name = "ttrpc"
version = "0.6.1" version = "0.6.1"
@ -2994,6 +3077,16 @@ version = "1.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca" checksum = "9d5b2c62b4012a3e1eca5a7e077d13b3bf498c4073e33ccd58626607748ceeca"
[[package]]
name = "want"
version = "0.3.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1ce8a968cb1cd110d136ff8b819a556d6fb6d919363c61534f6860c7eb172ba0"
dependencies = [
"log",
"try-lock",
]
[[package]] [[package]]
name = "wasi" name = "wasi"
version = "0.9.0+wasi-snapshot-preview1" version = "0.9.0+wasi-snapshot-preview1"

View File

@ -40,6 +40,10 @@ impl AgentManager for KataAgent {
self.stop_log_forwarder().await; self.stop_log_forwarder().await;
} }
async fn agent_sock(&self) -> Result<String> {
self.agent_sock().await
}
async fn agent_config(&self) -> AgentConfig { async fn agent_config(&self) -> AgentConfig {
self.agent_config().await self.agent_config().await
} }

View File

@ -127,6 +127,11 @@ impl KataAgent {
inner.log_forwarder.stop(); inner.log_forwarder.stop();
} }
pub(crate) async fn agent_sock(&self) -> Result<String> {
let inner = self.inner.lock().await;
Ok(inner.socket_address.clone())
}
pub(crate) async fn agent_config(&self) -> AgentConfig { pub(crate) async fn agent_config(&self) -> AgentConfig {
let inner = self.inner.lock().await; let inner = self.inner.lock().await;
inner.config.clone() inner.config.clone()

View File

@ -38,6 +38,7 @@ pub trait AgentManager: Send + Sync {
async fn start(&self, address: &str) -> Result<()>; async fn start(&self, address: &str) -> Result<()>;
async fn stop(&self); async fn stop(&self);
async fn agent_sock(&self) -> Result<String>;
async fn agent_config(&self) -> AgentConfig; async fn agent_config(&self) -> AgentConfig;
} }

View File

@ -11,6 +11,8 @@ lazy_static = "1.4.0"
slog = "2.5.2" slog = "2.5.2"
slog-scope = "4.4.0" slog-scope = "4.4.0"
tokio = { version = "1.8.0", features = ["rt-multi-thread"] } tokio = { version = "1.8.0", features = ["rt-multi-thread"] }
hyper = { version = "0.14.20", features = ["stream", "server", "http1"] }
hyperlocal = "0.8"
common = { path = "./common" } common = { path = "./common" }
kata-types = { path = "../../../libs/kata-types" } kata-types = { path = "../../../libs/kata-types" }

View File

@ -13,4 +13,7 @@ pub trait Sandbox: Send + Sync {
async fn stop(&self) -> Result<()>; async fn stop(&self) -> Result<()>;
async fn cleanup(&self, container_id: &str) -> Result<()>; async fn cleanup(&self, container_id: &str) -> Result<()>;
async fn shutdown(&self) -> Result<()>; async fn shutdown(&self) -> Result<()>;
// agent function
async fn agent_sock(&self) -> Result<String>;
} }

View File

@ -11,4 +11,6 @@ logging::logger_with_subsystem!(sl, "runtimes");
pub mod manager; pub mod manager;
pub use manager::RuntimeHandlerManager; pub use manager::RuntimeHandlerManager;
mod shim_mgmt;
pub use shim_mgmt::{client::MgmtClient, server::sb_storage_path};
mod static_resource; mod static_resource;

View File

@ -8,7 +8,7 @@ use std::sync::Arc;
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use crate::static_resource::StaticResourceManager; use crate::{shim_mgmt::server::MgmtServer, static_resource::StaticResourceManager};
use common::{ use common::{
message::Message, message::Message,
types::{Request, Response}, types::{Request, Response},
@ -109,6 +109,16 @@ impl RuntimeHandlerManagerInner {
.await .await
.context("init runtime handler")?; .context("init runtime handler")?;
// the sandbox creation can reach here only once and the sandbox is created
// so we can safely create the shim management socket right now
// the unwrap here is safe because the runtime handler is correctly created
let shim_mgmt_svr = MgmtServer::new(
&self.id,
self.runtime_instance.as_ref().unwrap().sandbox.clone(),
);
tokio::task::spawn(Arc::new(shim_mgmt_svr).run());
info!(sl!(), "shim management http server starts");
Ok(()) Ok(())
} }
@ -196,6 +206,7 @@ impl RuntimeHandlerManager {
.create_container(req, spec) .create_container(req, spec)
.await .await
.context("create container")?; .context("create container")?;
Ok(Response::CreateContainer(shim_pid)) Ok(Response::CreateContainer(shim_pid))
} else { } else {
self.handler_request(req).await.context("handler request") self.handler_request(req).await.context("handler request")

View File

@ -0,0 +1,61 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
// Defines the general client functions used by other components acting like
// clients. To be specific, a client first connect to the socket, then send
// request to destined URL, and finally handle the request(or not)
use std::{path::Path, path::PathBuf, time::Duration};
use super::server::mgmt_socket_addr;
use anyhow::{anyhow, Context, Result};
use hyper::{Body, Client, Response};
use hyperlocal::{UnixClientExt, UnixConnector, Uri};
/// Shim management client with timeout
pub struct MgmtClient {
/// The socket *file path* on host file system
sock_path: PathBuf,
/// The http client connect to the long standing shim mgmt server
client: Client<UnixConnector, Body>,
/// Timeout value for each dial, usually 200ms will be enough
/// For heavier workload, you may want longer timeout
timeout: Option<Duration>,
}
impl MgmtClient {
/// Construct a new client connecting to shim mgmt server
pub fn new(sid: String, timeout: Option<Duration>) -> Result<Self> {
let unix_socket_path = mgmt_socket_addr(sid);
let s_addr = unix_socket_path
.strip_prefix("unix:")
.context("failed to strix prefix")?;
let sock_path = Path::new("/").join(s_addr).as_path().to_owned();
let client = Client::unix();
Ok(Self {
sock_path,
client,
timeout,
})
}
/// The http GET method for client, return a raw response. Further handling should be done by caller.
/// Parameter uri should be like "/agent-url" etc.
pub async fn get(&self, uri: &str) -> Result<Response<Body>> {
let url: hyper::Uri = Uri::new(&self.sock_path, uri).into();
let work = self.client.get(url);
match self.timeout {
Some(timeout) => match tokio::time::timeout(timeout, work).await {
Ok(result) => result.map_err(|e| anyhow!(e)),
Err(_) => Err(anyhow!("TIMEOUT")),
},
// if timeout not set, work executes directly
None => work.await.context("failed to GET"),
}
}
}

View File

@ -0,0 +1,52 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
// This defines the handlers corresponding to the url when a request is sent to destined url,
// the handler function should be invoked, and the corresponding data will be in the response
use common::Sandbox;
use hyper::{Body, Method, Request, Response, Result, StatusCode};
use std::sync::Arc;
use super::server::AGENT_URL;
// main router for response, this works as a multiplexer on
// http arrival which invokes the corresponding handler function
pub(crate) async fn handler_mux(
sandbox: Arc<dyn Sandbox>,
req: Request<Body>,
) -> Result<Response<Body>> {
info!(
sl!(),
"mgmt-svr(mux): recv req, method: {}, uri: {}",
req.method(),
req.uri().path()
);
match (req.method(), req.uri().path()) {
(&Method::GET, AGENT_URL) => agent_url_handler(sandbox, req).await,
_ => Ok(not_found(req).await),
}
}
// url not found
async fn not_found(_req: Request<Body>) -> Response<Body> {
Response::builder()
.status(StatusCode::NOT_FOUND)
.body(Body::from("URL NOT FOUND"))
.unwrap()
}
// returns the url for agent
async fn agent_url_handler(
sandbox: Arc<dyn Sandbox>,
_req: Request<Body>,
) -> Result<Response<Body>> {
let agent_sock = sandbox
.agent_sock()
.await
.unwrap_or_else(|_| String::from(""));
Ok(Response::new(Body::from(agent_sock)))
}

View File

@ -0,0 +1,9 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
pub mod client;
mod handlers;
pub mod server;

View File

@ -0,0 +1,117 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
// Shim management service, this service starts a management http server on a socket
// and wire certain URL with a corresponding handler. When a command-line interface
// or further shim functions want the information corresponding to this, it can just
// send a GET request to the url, and the info will be in the response
#![allow(dead_code)] // some url's handler are *to be* developed
use std::{fs, path::Path, sync::Arc};
use anyhow::{Context, Result};
use common::Sandbox;
use hyper::{server::conn::Http, service::service_fn};
use tokio::net::UnixListener;
use super::handlers::handler_mux;
pub(crate) const DIRECT_VOLUMN_PATH_KEY: &str = "path";
pub(crate) const DIRECT_VOLUMN_STATS_URL: &str = "/direct-volumn/stats";
pub(crate) const DIRECT_VOLUMN_RESIZE_URL: &str = "/direct-volumn/resize";
pub(crate) const AGENT_URL: &str = "/agent-url";
pub(crate) const IP_TABLE_URL: &str = "/iptables";
pub(crate) const IP6_TABLE_URL: &str = "/ip6tables";
pub(crate) const METRICS_URL: &str = "/metrics";
const SHIM_MGMT_SOCK_NAME: &str = "shim-monitor.sock";
/// The shim management server instance
pub struct MgmtServer {
/// socket address(with prefix like hvsock://)
pub s_addr: String,
/// The sandbox instance
pub sandbox: Arc<dyn Sandbox>,
}
impl MgmtServer {
/// construct a new management server
pub fn new(sid: &str, sandbox: Arc<dyn Sandbox>) -> Self {
Self {
s_addr: mgmt_socket_addr(sid.to_owned()),
sandbox,
}
}
// TODO(when metrics is supported): write metric addresses to fs
// TODO(when metrics is supported): register shim metrics
// TODO(when metrics is supported): register sandbox metrics
// running management http server in an infinite loop, able to serve concurrent requests
pub async fn run(self: Arc<Self>) {
let listener = listener_from_path(self.s_addr.clone()).await.unwrap();
// start an infinite loop, which serves the incomming uds stream
loop {
let (stream, _) = listener.accept().await.unwrap();
let me = self.clone();
// spawn a light weight thread to multiplex to the handler
tokio::task::spawn(async move {
if let Err(err) = Http::new()
.serve_connection(
stream,
service_fn(|request| handler_mux(me.sandbox.clone(), request)),
)
.await
{
warn!(sl!(), "Failed to serve connection: {:?}", err);
}
});
}
}
}
// return sandbox's storage path
pub fn sb_storage_path() -> String {
String::from("/run/kata")
}
// returns the address of the unix domain socket(UDS) for communication with shim
// management service using http
// normally returns "unix:///run/kata/{sid}/shim_monitor.sock"
pub fn mgmt_socket_addr(sid: String) -> String {
let p = Path::new(&sb_storage_path())
.join(sid)
.join(SHIM_MGMT_SOCK_NAME);
format!("unix://{}", p.to_string_lossy())
}
// from path, return a unix listener corresponding to that path,
// if the path(socket file) is not created, we create that here
async fn listener_from_path(path: String) -> Result<UnixListener> {
// create the socket if not present
let trim_path = path.strip_prefix("unix:").context("trim path")?;
let file_path = Path::new("/").join(trim_path);
let file_path = file_path.as_path();
if let Some(parent_dir) = file_path.parent() {
fs::create_dir_all(parent_dir).context("create parent dir")?;
}
// bind the socket and return the listener
info!(sl!(), "mgmt-svr: binding to path {}", path);
UnixListener::bind(file_path).context("bind address")
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn mgmt_svr_test_sock_addr() {
let sid = String::from("414123");
let addr = mgmt_socket_addr(sid);
assert_eq!(addr, "unix:///run/kata/414123/shim-monitor.sock");
}
}

View File

@ -262,6 +262,10 @@ impl Sandbox for VirtSandbox {
// TODO: cleanup other snadbox resource // TODO: cleanup other snadbox resource
Ok(()) Ok(())
} }
async fn agent_sock(&self) -> Result<String> {
self.agent.agent_sock().await
}
} }
#[async_trait] #[async_trait]