mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-07-05 03:26:37 +00:00
runtime-rs: shim management - agent-url
Add agent-url to its handler. The general framework of registering URL handlers is done. Fixes: #5114 Signed-off-by: Ji-Xinyou <jerryji0414@outlook.com>
This commit is contained in:
parent
59aeb776b0
commit
e891295e10
@ -40,6 +40,10 @@ impl AgentManager for KataAgent {
|
|||||||
self.stop_log_forwarder().await;
|
self.stop_log_forwarder().await;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn agent_sock(&self) -> Result<String> {
|
||||||
|
self.agent_sock().await
|
||||||
|
}
|
||||||
|
|
||||||
async fn agent_config(&self) -> AgentConfig {
|
async fn agent_config(&self) -> AgentConfig {
|
||||||
self.agent_config().await
|
self.agent_config().await
|
||||||
}
|
}
|
||||||
|
@ -127,6 +127,11 @@ impl KataAgent {
|
|||||||
inner.log_forwarder.stop();
|
inner.log_forwarder.stop();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub(crate) async fn agent_sock(&self) -> Result<String> {
|
||||||
|
let inner = self.inner.lock().await;
|
||||||
|
Ok(inner.socket_address.clone())
|
||||||
|
}
|
||||||
|
|
||||||
pub(crate) async fn agent_config(&self) -> AgentConfig {
|
pub(crate) async fn agent_config(&self) -> AgentConfig {
|
||||||
let inner = self.inner.lock().await;
|
let inner = self.inner.lock().await;
|
||||||
inner.config.clone()
|
inner.config.clone()
|
||||||
|
@ -38,6 +38,7 @@ pub trait AgentManager: Send + Sync {
|
|||||||
async fn start(&self, address: &str) -> Result<()>;
|
async fn start(&self, address: &str) -> Result<()>;
|
||||||
async fn stop(&self);
|
async fn stop(&self);
|
||||||
|
|
||||||
|
async fn agent_sock(&self) -> Result<String>;
|
||||||
async fn agent_config(&self) -> AgentConfig;
|
async fn agent_config(&self) -> AgentConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -13,4 +13,7 @@ pub trait Sandbox: Send + Sync {
|
|||||||
async fn stop(&self) -> Result<()>;
|
async fn stop(&self) -> Result<()>;
|
||||||
async fn cleanup(&self, container_id: &str) -> Result<()>;
|
async fn cleanup(&self, container_id: &str) -> Result<()>;
|
||||||
async fn shutdown(&self) -> Result<()>;
|
async fn shutdown(&self) -> Result<()>;
|
||||||
|
|
||||||
|
// agent function
|
||||||
|
async fn agent_sock(&self) -> Result<String>;
|
||||||
}
|
}
|
||||||
|
@ -111,8 +111,13 @@ impl RuntimeHandlerManagerInner {
|
|||||||
|
|
||||||
// the sandbox creation can reach here only once and the sandbox is created
|
// the sandbox creation can reach here only once and the sandbox is created
|
||||||
// so we can safely create the shim management socket right now
|
// so we can safely create the shim management socket right now
|
||||||
let shim_mgmt_svr = MgmtServer::new(&self.id);
|
// the unwrap here is safe because the runtime handler is correctly created
|
||||||
shim_mgmt_svr.run().await;
|
let shim_mgmt_svr = MgmtServer::new(
|
||||||
|
&self.id,
|
||||||
|
self.runtime_instance.as_ref().unwrap().sandbox.clone(),
|
||||||
|
);
|
||||||
|
tokio::task::spawn(Arc::new(shim_mgmt_svr).run());
|
||||||
|
info!(sl!(), "shim management http server starts");
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
5
src/runtime-rs/crates/runtimes/src/shim_mgmt/client.rs
Normal file
5
src/runtime-rs/crates/runtimes/src/shim_mgmt/client.rs
Normal file
@ -0,0 +1,5 @@
|
|||||||
|
// Copyright (c) 2019-2022 Alibaba Cloud
|
||||||
|
// Copyright (c) 2019-2022 Ant Group
|
||||||
|
//
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
//
|
@ -4,23 +4,24 @@
|
|||||||
// SPDX-License-Identifier: Apache-2.0
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
//
|
//
|
||||||
|
|
||||||
// This defines the handler corresponding to the url
|
// This defines the handlers corresponding to the url when a request is sent to destined url,
|
||||||
// when a request is sent to destined url, the handler
|
// the handler function should be invoked, and the corresponding data will be in the response
|
||||||
// function should be invoked, and the corresponding
|
|
||||||
// data will be in the response's body
|
|
||||||
//
|
|
||||||
// NOTE: ALL HANDLER SHOULD BE ASYNC UNDER ROUTERIFY
|
|
||||||
|
|
||||||
|
use common::Sandbox;
|
||||||
use hyper::{Body, Method, Request, Response, Result, StatusCode};
|
use hyper::{Body, Method, Request, Response, Result, StatusCode};
|
||||||
|
use std::sync::Arc;
|
||||||
|
|
||||||
use super::server::AGENT_URL;
|
use super::server::AGENT_URL;
|
||||||
|
|
||||||
// main router for response, this works as a multiplexer on
|
// main router for response, this works as a multiplexer on
|
||||||
// http arrival which invokes the corresponding handler function
|
// http arrival which invokes the corresponding handler function
|
||||||
pub(crate) async fn handler_mux(req: Request<Body>) -> Result<Response<Body>> {
|
pub(crate) async fn handler_mux(
|
||||||
|
sandbox: Arc<dyn Sandbox>,
|
||||||
|
req: Request<Body>,
|
||||||
|
) -> Result<Response<Body>> {
|
||||||
info!(sl!(), "mgmt-svr(mux): recv req {:?}", req);
|
info!(sl!(), "mgmt-svr(mux): recv req {:?}", req);
|
||||||
match (req.method(), req.uri().path()) {
|
match (req.method(), req.uri().path()) {
|
||||||
(&Method::GET, AGENT_URL) => agent_url_handler(req).await,
|
(&Method::GET, AGENT_URL) => agent_url_handler(sandbox, req).await,
|
||||||
_ => Ok(not_found(req).await),
|
_ => Ok(not_found(req).await),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -34,7 +35,13 @@ async fn not_found(_req: Request<Body>) -> Response<Body> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// returns the url for agent
|
// returns the url for agent
|
||||||
async fn agent_url_handler(_req: Request<Body>) -> Result<Response<Body>> {
|
async fn agent_url_handler(
|
||||||
// todo
|
sandbox: Arc<dyn Sandbox>,
|
||||||
Ok(Response::new(Body::from("")))
|
_req: Request<Body>,
|
||||||
|
) -> Result<Response<Body>> {
|
||||||
|
let agent_sock = sandbox
|
||||||
|
.agent_sock()
|
||||||
|
.await
|
||||||
|
.unwrap_or_else(|_| String::from(""));
|
||||||
|
Ok(Response::new(Body::from(agent_sock)))
|
||||||
}
|
}
|
||||||
|
@ -6,3 +6,4 @@
|
|||||||
|
|
||||||
mod handlers;
|
mod handlers;
|
||||||
pub mod server;
|
pub mod server;
|
||||||
|
pub mod client;
|
||||||
|
@ -11,9 +11,10 @@
|
|||||||
|
|
||||||
#![allow(dead_code)] // some url's handler are *to be* developed
|
#![allow(dead_code)] // some url's handler are *to be* developed
|
||||||
|
|
||||||
use std::{fs, path::Path};
|
use std::{fs, path::Path, sync::Arc};
|
||||||
|
|
||||||
use anyhow::{Context, Result};
|
use anyhow::{Context, Result};
|
||||||
|
use common::Sandbox;
|
||||||
use hyper::{server::conn::Http, service::service_fn};
|
use hyper::{server::conn::Http, service::service_fn};
|
||||||
use tokio::net::UnixListener;
|
use tokio::net::UnixListener;
|
||||||
|
|
||||||
@ -29,42 +30,47 @@ pub(crate) const METRICS_URL: &str = "/metrics";
|
|||||||
|
|
||||||
const SHIM_MGMT_SOCK_NAME: &str = "shim-monitor.sock";
|
const SHIM_MGMT_SOCK_NAME: &str = "shim-monitor.sock";
|
||||||
|
|
||||||
// The management server
|
/// The shim management server instance
|
||||||
#[derive(Debug)]
|
|
||||||
pub struct MgmtServer {
|
pub struct MgmtServer {
|
||||||
// socket address
|
/// socket address(with prefix like hvsock://)
|
||||||
pub s_addr: String,
|
pub s_addr: String,
|
||||||
|
|
||||||
|
/// The sandbox instance
|
||||||
|
pub sandbox: Arc<dyn Sandbox>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MgmtServer {
|
impl MgmtServer {
|
||||||
pub fn new(sid: &str) -> Self {
|
/// construct a new management server
|
||||||
|
pub fn new(sid: &str, sandbox: Arc<dyn Sandbox>) -> Self {
|
||||||
Self {
|
Self {
|
||||||
s_addr: mgmt_socket_addr(sid.to_owned()),
|
s_addr: mgmt_socket_addr(sid.to_owned()),
|
||||||
|
sandbox,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TODO(when metrics is supported): write metric addresses to fs
|
// TODO(when metrics is supported): write metric addresses to fs
|
||||||
// TODO(when metrics is supported): register shim metrics
|
// TODO(when metrics is supported): register shim metrics
|
||||||
// TODO(when metrics is supported): register sandbox metrics
|
// TODO(when metrics is supported): register sandbox metrics
|
||||||
// start a new thread, running management http server in a dead loop
|
// running management http server in an infinite loop, able to serve concurrent requests
|
||||||
pub async fn run(&self) {
|
pub async fn run(self: Arc<Self>) {
|
||||||
let lsnr = lsnr_from_path(self.s_addr.clone()).await.unwrap();
|
let lsnr = lsnr_from_path(self.s_addr.clone()).await.unwrap();
|
||||||
|
|
||||||
// start an infinate loop, which serves the incomming uds stream
|
// start an infinate loop, which serves the incomming uds stream
|
||||||
tokio::task::spawn(async move {
|
|
||||||
loop {
|
loop {
|
||||||
let (stream, _) = lsnr.accept().await.unwrap();
|
let (stream, _) = lsnr.accept().await.unwrap();
|
||||||
|
let me = self.clone();
|
||||||
// spawn a light weight thread to multiplex to the handler
|
// spawn a light weight thread to multiplex to the handler
|
||||||
tokio::task::spawn(async move {
|
tokio::task::spawn(async move {
|
||||||
if let Err(err) = Http::new()
|
if let Err(err) = Http::new()
|
||||||
.serve_connection(stream, service_fn(handler_mux))
|
.serve_connection(
|
||||||
|
stream,
|
||||||
|
service_fn(|request| handler_mux(me.sandbox.clone(), request)),
|
||||||
|
)
|
||||||
.await
|
.await
|
||||||
{
|
{
|
||||||
warn!(sl!(), "Failed to serve connection: {:?}", err);
|
warn!(sl!(), "Failed to serve connection: {:?}", err);
|
||||||
}
|
}
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
});
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -86,12 +92,14 @@ pub fn mgmt_socket_addr(sid: String) -> String {
|
|||||||
// from path, return a unix listener corresponding to that path,
|
// from path, return a unix listener corresponding to that path,
|
||||||
// if the path(socket file) is not created, we create that here
|
// if the path(socket file) is not created, we create that here
|
||||||
async fn lsnr_from_path(path: String) -> Result<UnixListener> {
|
async fn lsnr_from_path(path: String) -> Result<UnixListener> {
|
||||||
|
// create the socket if not present
|
||||||
let trim_path = path.strip_prefix("unix:").context("trim path")?;
|
let trim_path = path.strip_prefix("unix:").context("trim path")?;
|
||||||
let file_path = Path::new("/").join(trim_path);
|
let file_path = Path::new("/").join(trim_path);
|
||||||
let file_path = file_path.as_path();
|
let file_path = file_path.as_path();
|
||||||
if let Some(parent_dir) = file_path.parent() {
|
if let Some(parent_dir) = file_path.parent() {
|
||||||
fs::create_dir_all(parent_dir).context("create parent dir")?;
|
fs::create_dir_all(parent_dir).context("create parent dir")?;
|
||||||
}
|
}
|
||||||
|
// bind the socket and return the listener
|
||||||
info!(sl!(), "mgmt-svr: binding to path {}", path);
|
info!(sl!(), "mgmt-svr: binding to path {}", path);
|
||||||
UnixListener::bind(file_path).context("bind address")
|
UnixListener::bind(file_path).context("bind address")
|
||||||
}
|
}
|
||||||
|
@ -262,6 +262,10 @@ impl Sandbox for VirtSandbox {
|
|||||||
// TODO: cleanup other snadbox resource
|
// TODO: cleanup other snadbox resource
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn agent_sock(&self) -> Result<String> {
|
||||||
|
self.agent.agent_sock().await
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
#[async_trait]
|
#[async_trait]
|
||||||
|
Loading…
Reference in New Issue
Block a user