runtime-rs: shim management - agent-url

Add agent-url to its handler. The general framework of registering URL
handlers is done.

Fixes: #5114
Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>
This commit is contained in:
Ji-Xinyou 2022-09-07 11:08:05 +08:00
parent 59aeb776b0
commit e891295e10
10 changed files with 78 additions and 35 deletions

View File

@ -40,6 +40,10 @@ impl AgentManager for KataAgent {
self.stop_log_forwarder().await;
}
async fn agent_sock(&self) -> Result<String> {
self.agent_sock().await
}
async fn agent_config(&self) -> AgentConfig {
self.agent_config().await
}

View File

@ -127,6 +127,11 @@ impl KataAgent {
inner.log_forwarder.stop();
}
pub(crate) async fn agent_sock(&self) -> Result<String> {
let inner = self.inner.lock().await;
Ok(inner.socket_address.clone())
}
pub(crate) async fn agent_config(&self) -> AgentConfig {
let inner = self.inner.lock().await;
inner.config.clone()

View File

@ -38,6 +38,7 @@ pub trait AgentManager: Send + Sync {
async fn start(&self, address: &str) -> Result<()>;
async fn stop(&self);
async fn agent_sock(&self) -> Result<String>;
async fn agent_config(&self) -> AgentConfig;
}

View File

@ -13,4 +13,7 @@ pub trait Sandbox: Send + Sync {
async fn stop(&self) -> Result<()>;
async fn cleanup(&self, container_id: &str) -> Result<()>;
async fn shutdown(&self) -> Result<()>;
// agent function
async fn agent_sock(&self) -> Result<String>;
}

View File

@ -111,8 +111,13 @@ impl RuntimeHandlerManagerInner {
// the sandbox creation can reach here only once and the sandbox is created
// so we can safely create the shim management socket right now
let shim_mgmt_svr = MgmtServer::new(&self.id);
shim_mgmt_svr.run().await;
// the unwrap here is safe because the runtime handler is correctly created
let shim_mgmt_svr = MgmtServer::new(
&self.id,
self.runtime_instance.as_ref().unwrap().sandbox.clone(),
);
tokio::task::spawn(Arc::new(shim_mgmt_svr).run());
info!(sl!(), "shim management http server starts");
Ok(())
}

View File

@ -0,0 +1,5 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//

View File

@ -4,23 +4,24 @@
// SPDX-License-Identifier: Apache-2.0
//
// This defines the handler corresponding to the url
// when a request is sent to destined url, the handler
// function should be invoked, and the corresponding
// data will be in the response's body
//
// NOTE: ALL HANDLER SHOULD BE ASYNC UNDER ROUTERIFY
// This defines the handlers corresponding to the url when a request is sent to destined url,
// the handler function should be invoked, and the corresponding data will be in the response
use common::Sandbox;
use hyper::{Body, Method, Request, Response, Result, StatusCode};
use std::sync::Arc;
use super::server::AGENT_URL;
// main router for response, this works as a multiplexer on
// http arrival which invokes the corresponding handler function
pub(crate) async fn handler_mux(req: Request<Body>) -> Result<Response<Body>> {
pub(crate) async fn handler_mux(
sandbox: Arc<dyn Sandbox>,
req: Request<Body>,
) -> Result<Response<Body>> {
info!(sl!(), "mgmt-svr(mux): recv req {:?}", req);
match (req.method(), req.uri().path()) {
(&Method::GET, AGENT_URL) => agent_url_handler(req).await,
(&Method::GET, AGENT_URL) => agent_url_handler(sandbox, req).await,
_ => Ok(not_found(req).await),
}
}
@ -34,7 +35,13 @@ async fn not_found(_req: Request<Body>) -> Response<Body> {
}
// returns the url for agent
async fn agent_url_handler(_req: Request<Body>) -> Result<Response<Body>> {
// todo
Ok(Response::new(Body::from("")))
async fn agent_url_handler(
sandbox: Arc<dyn Sandbox>,
_req: Request<Body>,
) -> Result<Response<Body>> {
let agent_sock = sandbox
.agent_sock()
.await
.unwrap_or_else(|_| String::from(""));
Ok(Response::new(Body::from(agent_sock)))
}

View File

@ -6,3 +6,4 @@
mod handlers;
pub mod server;
pub mod client;

View File

@ -11,9 +11,10 @@
#![allow(dead_code)] // some url's handler are *to be* developed
use std::{fs, path::Path};
use std::{fs, path::Path, sync::Arc};
use anyhow::{Context, Result};
use common::Sandbox;
use hyper::{server::conn::Http, service::service_fn};
use tokio::net::UnixListener;
@ -29,42 +30,47 @@ pub(crate) const METRICS_URL: &str = "/metrics";
const SHIM_MGMT_SOCK_NAME: &str = "shim-monitor.sock";
// The management server
#[derive(Debug)]
/// The shim management server instance
pub struct MgmtServer {
// socket address
/// socket address(with prefix like hvsock://)
pub s_addr: String,
/// The sandbox instance
pub sandbox: Arc<dyn Sandbox>,
}
impl MgmtServer {
pub fn new(sid: &str) -> Self {
/// construct a new management server
pub fn new(sid: &str, sandbox: Arc<dyn Sandbox>) -> Self {
Self {
s_addr: mgmt_socket_addr(sid.to_owned()),
sandbox,
}
}
// TODO(when metrics is supported): write metric addresses to fs
// TODO(when metrics is supported): register shim metrics
// TODO(when metrics is supported): register sandbox metrics
// start a new thread, running management http server in a dead loop
pub async fn run(&self) {
// running management http server in an infinite loop, able to serve concurrent requests
pub async fn run(self: Arc<Self>) {
let lsnr = lsnr_from_path(self.s_addr.clone()).await.unwrap();
// start an infinate loop, which serves the incomming uds stream
tokio::task::spawn(async move {
loop {
let (stream, _) = lsnr.accept().await.unwrap();
// spawn a light weight thread to multiplex to the handler
tokio::task::spawn(async move {
if let Err(err) = Http::new()
.serve_connection(stream, service_fn(handler_mux))
.await
{
warn!(sl!(), "Failed to serve connection: {:?}", err);
}
});
}
});
loop {
let (stream, _) = lsnr.accept().await.unwrap();
let me = self.clone();
// spawn a light weight thread to multiplex to the handler
tokio::task::spawn(async move {
if let Err(err) = Http::new()
.serve_connection(
stream,
service_fn(|request| handler_mux(me.sandbox.clone(), request)),
)
.await
{
warn!(sl!(), "Failed to serve connection: {:?}", err);
}
});
}
}
}
@ -86,12 +92,14 @@ pub fn mgmt_socket_addr(sid: String) -> String {
// from path, return a unix listener corresponding to that path,
// if the path(socket file) is not created, we create that here
async fn lsnr_from_path(path: String) -> Result<UnixListener> {
// create the socket if not present
let trim_path = path.strip_prefix("unix:").context("trim path")?;
let file_path = Path::new("/").join(trim_path);
let file_path = file_path.as_path();
if let Some(parent_dir) = file_path.parent() {
fs::create_dir_all(parent_dir).context("create parent dir")?;
}
// bind the socket and return the listener
info!(sl!(), "mgmt-svr: binding to path {}", path);
UnixListener::bind(file_path).context("bind address")
}

View File

@ -262,6 +262,10 @@ impl Sandbox for VirtSandbox {
// TODO: cleanup other snadbox resource
Ok(())
}
async fn agent_sock(&self) -> Result<String> {
self.agent.agent_sock().await
}
}
#[async_trait]