mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-04-29 12:14:48 +00:00
runtime-rs: shim management - agent-url
Add agent-url to its handler. The general framework of registering URL handlers is done. Fixes: #5114 Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>
This commit is contained in:
parent
59aeb776b0
commit
e891295e10
@ -40,6 +40,10 @@ impl AgentManager for KataAgent {
|
||||
self.stop_log_forwarder().await;
|
||||
}
|
||||
|
||||
async fn agent_sock(&self) -> Result<String> {
|
||||
self.agent_sock().await
|
||||
}
|
||||
|
||||
async fn agent_config(&self) -> AgentConfig {
|
||||
self.agent_config().await
|
||||
}
|
||||
|
@ -127,6 +127,11 @@ impl KataAgent {
|
||||
inner.log_forwarder.stop();
|
||||
}
|
||||
|
||||
pub(crate) async fn agent_sock(&self) -> Result<String> {
|
||||
let inner = self.inner.lock().await;
|
||||
Ok(inner.socket_address.clone())
|
||||
}
|
||||
|
||||
pub(crate) async fn agent_config(&self) -> AgentConfig {
|
||||
let inner = self.inner.lock().await;
|
||||
inner.config.clone()
|
||||
|
@ -38,6 +38,7 @@ pub trait AgentManager: Send + Sync {
|
||||
async fn start(&self, address: &str) -> Result<()>;
|
||||
async fn stop(&self);
|
||||
|
||||
async fn agent_sock(&self) -> Result<String>;
|
||||
async fn agent_config(&self) -> AgentConfig;
|
||||
}
|
||||
|
||||
|
@ -13,4 +13,7 @@ pub trait Sandbox: Send + Sync {
|
||||
async fn stop(&self) -> Result<()>;
|
||||
async fn cleanup(&self, container_id: &str) -> Result<()>;
|
||||
async fn shutdown(&self) -> Result<()>;
|
||||
|
||||
// agent function
|
||||
async fn agent_sock(&self) -> Result<String>;
|
||||
}
|
||||
|
@ -111,8 +111,13 @@ impl RuntimeHandlerManagerInner {
|
||||
|
||||
// the sandbox creation can reach here only once and the sandbox is created
|
||||
// so we can safely create the shim management socket right now
|
||||
let shim_mgmt_svr = MgmtServer::new(&self.id);
|
||||
shim_mgmt_svr.run().await;
|
||||
// the unwrap here is safe because the runtime handler is correctly created
|
||||
let shim_mgmt_svr = MgmtServer::new(
|
||||
&self.id,
|
||||
self.runtime_instance.as_ref().unwrap().sandbox.clone(),
|
||||
);
|
||||
tokio::task::spawn(Arc::new(shim_mgmt_svr).run());
|
||||
info!(sl!(), "shim management http server starts");
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
5
src/runtime-rs/crates/runtimes/src/shim_mgmt/client.rs
Normal file
5
src/runtime-rs/crates/runtimes/src/shim_mgmt/client.rs
Normal file
@ -0,0 +1,5 @@
|
||||
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||
// Copyright (c) 2019-2022 Ant Group
|
||||
//
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
@ -4,23 +4,24 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
//
|
||||
|
||||
// This defines the handler corresponding to the url
|
||||
// when a request is sent to destined url, the handler
|
||||
// function should be invoked, and the corresponding
|
||||
// data will be in the response's body
|
||||
//
|
||||
// NOTE: ALL HANDLER SHOULD BE ASYNC UNDER ROUTERIFY
|
||||
// This defines the handlers corresponding to the url when a request is sent to destined url,
|
||||
// the handler function should be invoked, and the corresponding data will be in the response
|
||||
|
||||
use common::Sandbox;
|
||||
use hyper::{Body, Method, Request, Response, Result, StatusCode};
|
||||
use std::sync::Arc;
|
||||
|
||||
use super::server::AGENT_URL;
|
||||
|
||||
// main router for response, this works as a multiplexer on
|
||||
// http arrival which invokes the corresponding handler function
|
||||
pub(crate) async fn handler_mux(req: Request<Body>) -> Result<Response<Body>> {
|
||||
pub(crate) async fn handler_mux(
|
||||
sandbox: Arc<dyn Sandbox>,
|
||||
req: Request<Body>,
|
||||
) -> Result<Response<Body>> {
|
||||
info!(sl!(), "mgmt-svr(mux): recv req {:?}", req);
|
||||
match (req.method(), req.uri().path()) {
|
||||
(&Method::GET, AGENT_URL) => agent_url_handler(req).await,
|
||||
(&Method::GET, AGENT_URL) => agent_url_handler(sandbox, req).await,
|
||||
_ => Ok(not_found(req).await),
|
||||
}
|
||||
}
|
||||
@ -34,7 +35,13 @@ async fn not_found(_req: Request<Body>) -> Response<Body> {
|
||||
}
|
||||
|
||||
// returns the url for agent
|
||||
async fn agent_url_handler(_req: Request<Body>) -> Result<Response<Body>> {
|
||||
// todo
|
||||
Ok(Response::new(Body::from("")))
|
||||
async fn agent_url_handler(
|
||||
sandbox: Arc<dyn Sandbox>,
|
||||
_req: Request<Body>,
|
||||
) -> Result<Response<Body>> {
|
||||
let agent_sock = sandbox
|
||||
.agent_sock()
|
||||
.await
|
||||
.unwrap_or_else(|_| String::from(""));
|
||||
Ok(Response::new(Body::from(agent_sock)))
|
||||
}
|
||||
|
@ -6,3 +6,4 @@
|
||||
|
||||
mod handlers;
|
||||
pub mod server;
|
||||
pub mod client;
|
||||
|
@ -11,9 +11,10 @@
|
||||
|
||||
#![allow(dead_code)] // some url's handler are *to be* developed
|
||||
|
||||
use std::{fs, path::Path};
|
||||
use std::{fs, path::Path, sync::Arc};
|
||||
|
||||
use anyhow::{Context, Result};
|
||||
use common::Sandbox;
|
||||
use hyper::{server::conn::Http, service::service_fn};
|
||||
use tokio::net::UnixListener;
|
||||
|
||||
@ -29,42 +30,47 @@ pub(crate) const METRICS_URL: &str = "/metrics";
|
||||
|
||||
const SHIM_MGMT_SOCK_NAME: &str = "shim-monitor.sock";
|
||||
|
||||
// The management server
|
||||
#[derive(Debug)]
|
||||
/// The shim management server instance
|
||||
pub struct MgmtServer {
|
||||
// socket address
|
||||
/// socket address(with prefix like hvsock://)
|
||||
pub s_addr: String,
|
||||
|
||||
/// The sandbox instance
|
||||
pub sandbox: Arc<dyn Sandbox>,
|
||||
}
|
||||
|
||||
impl MgmtServer {
|
||||
pub fn new(sid: &str) -> Self {
|
||||
/// construct a new management server
|
||||
pub fn new(sid: &str, sandbox: Arc<dyn Sandbox>) -> Self {
|
||||
Self {
|
||||
s_addr: mgmt_socket_addr(sid.to_owned()),
|
||||
sandbox,
|
||||
}
|
||||
}
|
||||
|
||||
// TODO(when metrics is supported): write metric addresses to fs
|
||||
// TODO(when metrics is supported): register shim metrics
|
||||
// TODO(when metrics is supported): register sandbox metrics
|
||||
// start a new thread, running management http server in a dead loop
|
||||
pub async fn run(&self) {
|
||||
// running management http server in an infinite loop, able to serve concurrent requests
|
||||
pub async fn run(self: Arc<Self>) {
|
||||
let lsnr = lsnr_from_path(self.s_addr.clone()).await.unwrap();
|
||||
|
||||
// start an infinate loop, which serves the incomming uds stream
|
||||
tokio::task::spawn(async move {
|
||||
loop {
|
||||
let (stream, _) = lsnr.accept().await.unwrap();
|
||||
// spawn a light weight thread to multiplex to the handler
|
||||
tokio::task::spawn(async move {
|
||||
if let Err(err) = Http::new()
|
||||
.serve_connection(stream, service_fn(handler_mux))
|
||||
.await
|
||||
{
|
||||
warn!(sl!(), "Failed to serve connection: {:?}", err);
|
||||
}
|
||||
});
|
||||
}
|
||||
});
|
||||
loop {
|
||||
let (stream, _) = lsnr.accept().await.unwrap();
|
||||
let me = self.clone();
|
||||
// spawn a light weight thread to multiplex to the handler
|
||||
tokio::task::spawn(async move {
|
||||
if let Err(err) = Http::new()
|
||||
.serve_connection(
|
||||
stream,
|
||||
service_fn(|request| handler_mux(me.sandbox.clone(), request)),
|
||||
)
|
||||
.await
|
||||
{
|
||||
warn!(sl!(), "Failed to serve connection: {:?}", err);
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -86,12 +92,14 @@ pub fn mgmt_socket_addr(sid: String) -> String {
|
||||
// from path, return a unix listener corresponding to that path,
|
||||
// if the path(socket file) is not created, we create that here
|
||||
async fn lsnr_from_path(path: String) -> Result<UnixListener> {
|
||||
// create the socket if not present
|
||||
let trim_path = path.strip_prefix("unix:").context("trim path")?;
|
||||
let file_path = Path::new("/").join(trim_path);
|
||||
let file_path = file_path.as_path();
|
||||
if let Some(parent_dir) = file_path.parent() {
|
||||
fs::create_dir_all(parent_dir).context("create parent dir")?;
|
||||
}
|
||||
// bind the socket and return the listener
|
||||
info!(sl!(), "mgmt-svr: binding to path {}", path);
|
||||
UnixListener::bind(file_path).context("bind address")
|
||||
}
|
||||
|
@ -262,6 +262,10 @@ impl Sandbox for VirtSandbox {
|
||||
// TODO: cleanup other snadbox resource
|
||||
Ok(())
|
||||
}
|
||||
|
||||
async fn agent_sock(&self) -> Result<String> {
|
||||
self.agent.agent_sock().await
|
||||
}
|
||||
}
|
||||
|
||||
#[async_trait]
|
||||
|
Loading…
Reference in New Issue
Block a user