diff --git a/src/runtime-rs/crates/agent/src/kata/agent.rs b/src/runtime-rs/crates/agent/src/kata/agent.rs index 52afbee9de..6e9e1fd107 100644 --- a/src/runtime-rs/crates/agent/src/kata/agent.rs +++ b/src/runtime-rs/crates/agent/src/kata/agent.rs @@ -40,6 +40,10 @@ impl AgentManager for KataAgent { self.stop_log_forwarder().await; } + async fn agent_sock(&self) -> Result { + self.agent_sock().await + } + async fn agent_config(&self) -> AgentConfig { self.agent_config().await } diff --git a/src/runtime-rs/crates/agent/src/kata/mod.rs b/src/runtime-rs/crates/agent/src/kata/mod.rs index 2a9283c991..fc4fdef58c 100644 --- a/src/runtime-rs/crates/agent/src/kata/mod.rs +++ b/src/runtime-rs/crates/agent/src/kata/mod.rs @@ -127,6 +127,11 @@ impl KataAgent { inner.log_forwarder.stop(); } + pub(crate) async fn agent_sock(&self) -> Result { + let inner = self.inner.lock().await; + Ok(inner.socket_address.clone()) + } + pub(crate) async fn agent_config(&self) -> AgentConfig { let inner = self.inner.lock().await; inner.config.clone() diff --git a/src/runtime-rs/crates/agent/src/lib.rs b/src/runtime-rs/crates/agent/src/lib.rs index 2fc1b1e281..8c41d751f5 100644 --- a/src/runtime-rs/crates/agent/src/lib.rs +++ b/src/runtime-rs/crates/agent/src/lib.rs @@ -38,6 +38,7 @@ pub trait AgentManager: Send + Sync { async fn start(&self, address: &str) -> Result<()>; async fn stop(&self); + async fn agent_sock(&self) -> Result; async fn agent_config(&self) -> AgentConfig; } diff --git a/src/runtime-rs/crates/runtimes/common/src/sandbox.rs b/src/runtime-rs/crates/runtimes/common/src/sandbox.rs index fbb5db53bc..269a3586ed 100644 --- a/src/runtime-rs/crates/runtimes/common/src/sandbox.rs +++ b/src/runtime-rs/crates/runtimes/common/src/sandbox.rs @@ -13,4 +13,7 @@ pub trait Sandbox: Send + Sync { async fn stop(&self) -> Result<()>; async fn cleanup(&self, container_id: &str) -> Result<()>; async fn shutdown(&self) -> Result<()>; + + // agent function + async fn agent_sock(&self) -> Result; } diff --git a/src/runtime-rs/crates/runtimes/src/manager.rs b/src/runtime-rs/crates/runtimes/src/manager.rs index 14e9151f2a..b0773c5f17 100644 --- a/src/runtime-rs/crates/runtimes/src/manager.rs +++ b/src/runtime-rs/crates/runtimes/src/manager.rs @@ -111,8 +111,13 @@ impl RuntimeHandlerManagerInner { // the sandbox creation can reach here only once and the sandbox is created // so we can safely create the shim management socket right now - let shim_mgmt_svr = MgmtServer::new(&self.id); - shim_mgmt_svr.run().await; + // the unwrap here is safe because the runtime handler is correctly created + let shim_mgmt_svr = MgmtServer::new( + &self.id, + self.runtime_instance.as_ref().unwrap().sandbox.clone(), + ); + tokio::task::spawn(Arc::new(shim_mgmt_svr).run()); + info!(sl!(), "shim management http server starts"); Ok(()) } diff --git a/src/runtime-rs/crates/runtimes/src/shim_mgmt/client.rs b/src/runtime-rs/crates/runtimes/src/shim_mgmt/client.rs new file mode 100644 index 0000000000..08933463cf --- /dev/null +++ b/src/runtime-rs/crates/runtimes/src/shim_mgmt/client.rs @@ -0,0 +1,5 @@ +// Copyright (c) 2019-2022 Alibaba Cloud +// Copyright (c) 2019-2022 Ant Group +// +// SPDX-License-Identifier: Apache-2.0 +// \ No newline at end of file diff --git a/src/runtime-rs/crates/runtimes/src/shim_mgmt/handlers.rs b/src/runtime-rs/crates/runtimes/src/shim_mgmt/handlers.rs index 7c55d8ad4d..57ccf389e5 100644 --- a/src/runtime-rs/crates/runtimes/src/shim_mgmt/handlers.rs +++ b/src/runtime-rs/crates/runtimes/src/shim_mgmt/handlers.rs @@ -4,23 +4,24 @@ // SPDX-License-Identifier: Apache-2.0 // -// This defines the handler corresponding to the url -// when a request is sent to destined url, the handler -// function should be invoked, and the corresponding -// data will be in the response's body -// -// NOTE: ALL HANDLER SHOULD BE ASYNC UNDER ROUTERIFY +// This defines the handlers corresponding to the url when a request is sent to destined url, +// the handler function should be invoked, and the corresponding data will be in the response +use common::Sandbox; use hyper::{Body, Method, Request, Response, Result, StatusCode}; +use std::sync::Arc; use super::server::AGENT_URL; // main router for response, this works as a multiplexer on // http arrival which invokes the corresponding handler function -pub(crate) async fn handler_mux(req: Request) -> Result> { +pub(crate) async fn handler_mux( + sandbox: Arc, + req: Request, +) -> Result> { info!(sl!(), "mgmt-svr(mux): recv req {:?}", req); match (req.method(), req.uri().path()) { - (&Method::GET, AGENT_URL) => agent_url_handler(req).await, + (&Method::GET, AGENT_URL) => agent_url_handler(sandbox, req).await, _ => Ok(not_found(req).await), } } @@ -34,7 +35,13 @@ async fn not_found(_req: Request) -> Response { } // returns the url for agent -async fn agent_url_handler(_req: Request) -> Result> { - // todo - Ok(Response::new(Body::from(""))) +async fn agent_url_handler( + sandbox: Arc, + _req: Request, +) -> Result> { + let agent_sock = sandbox + .agent_sock() + .await + .unwrap_or_else(|_| String::from("")); + Ok(Response::new(Body::from(agent_sock))) } diff --git a/src/runtime-rs/crates/runtimes/src/shim_mgmt/mod.rs b/src/runtime-rs/crates/runtimes/src/shim_mgmt/mod.rs index 67bc890adb..ff053b3370 100644 --- a/src/runtime-rs/crates/runtimes/src/shim_mgmt/mod.rs +++ b/src/runtime-rs/crates/runtimes/src/shim_mgmt/mod.rs @@ -6,3 +6,4 @@ mod handlers; pub mod server; +pub mod client; diff --git a/src/runtime-rs/crates/runtimes/src/shim_mgmt/server.rs b/src/runtime-rs/crates/runtimes/src/shim_mgmt/server.rs index c51b5c45ba..7b1f8cf18b 100644 --- a/src/runtime-rs/crates/runtimes/src/shim_mgmt/server.rs +++ b/src/runtime-rs/crates/runtimes/src/shim_mgmt/server.rs @@ -11,9 +11,10 @@ #![allow(dead_code)] // some url's handler are *to be* developed -use std::{fs, path::Path}; +use std::{fs, path::Path, sync::Arc}; use anyhow::{Context, Result}; +use common::Sandbox; use hyper::{server::conn::Http, service::service_fn}; use tokio::net::UnixListener; @@ -29,42 +30,47 @@ pub(crate) const METRICS_URL: &str = "/metrics"; const SHIM_MGMT_SOCK_NAME: &str = "shim-monitor.sock"; -// The management server -#[derive(Debug)] +/// The shim management server instance pub struct MgmtServer { - // socket address + /// socket address(with prefix like hvsock://) pub s_addr: String, + + /// The sandbox instance + pub sandbox: Arc, } impl MgmtServer { - pub fn new(sid: &str) -> Self { + /// construct a new management server + pub fn new(sid: &str, sandbox: Arc) -> Self { Self { s_addr: mgmt_socket_addr(sid.to_owned()), + sandbox, } } // TODO(when metrics is supported): write metric addresses to fs // TODO(when metrics is supported): register shim metrics // TODO(when metrics is supported): register sandbox metrics - // start a new thread, running management http server in a dead loop - pub async fn run(&self) { + // running management http server in an infinite loop, able to serve concurrent requests + pub async fn run(self: Arc) { let lsnr = lsnr_from_path(self.s_addr.clone()).await.unwrap(); - // start an infinate loop, which serves the incomming uds stream - tokio::task::spawn(async move { - loop { - let (stream, _) = lsnr.accept().await.unwrap(); - // spawn a light weight thread to multiplex to the handler - tokio::task::spawn(async move { - if let Err(err) = Http::new() - .serve_connection(stream, service_fn(handler_mux)) - .await - { - warn!(sl!(), "Failed to serve connection: {:?}", err); - } - }); - } - }); + loop { + let (stream, _) = lsnr.accept().await.unwrap(); + let me = self.clone(); + // spawn a light weight thread to multiplex to the handler + tokio::task::spawn(async move { + if let Err(err) = Http::new() + .serve_connection( + stream, + service_fn(|request| handler_mux(me.sandbox.clone(), request)), + ) + .await + { + warn!(sl!(), "Failed to serve connection: {:?}", err); + } + }); + } } } @@ -86,12 +92,14 @@ pub fn mgmt_socket_addr(sid: String) -> String { // from path, return a unix listener corresponding to that path, // if the path(socket file) is not created, we create that here async fn lsnr_from_path(path: String) -> Result { + // create the socket if not present let trim_path = path.strip_prefix("unix:").context("trim path")?; let file_path = Path::new("/").join(trim_path); let file_path = file_path.as_path(); if let Some(parent_dir) = file_path.parent() { fs::create_dir_all(parent_dir).context("create parent dir")?; } + // bind the socket and return the listener info!(sl!(), "mgmt-svr: binding to path {}", path); UnixListener::bind(file_path).context("bind address") } diff --git a/src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs b/src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs index e258ec7c72..ebe0a4e204 100644 --- a/src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs +++ b/src/runtime-rs/crates/runtimes/virt_container/src/sandbox.rs @@ -262,6 +262,10 @@ impl Sandbox for VirtSandbox { // TODO: cleanup other snadbox resource Ok(()) } + + async fn agent_sock(&self) -> Result { + self.agent.agent_sock().await + } } #[async_trait]