runtime-rs: agent implements

Responsible for communicating with the agent, such as kata-agent in the VM

Fixes: #3785
Signed-off-by: Quanwei Zhou <quanweiZhou@linux.alibaba.com>
This commit is contained in:
Quanwei Zhou
2021-12-03 18:53:48 +08:00
committed by Fupan Li
parent d3da156eea
commit 4296e3069f
14 changed files with 2317 additions and 34 deletions

View File

@@ -0,0 +1,28 @@
[package]
name = "agent"
version = "0.1.0"
authors = ["The Kata Containers community <kata-dev@lists.katacontainers.io>"]
edition = "2018"
[dev-dependencies]
futures = "0.1.27"
[dependencies]
anyhow = "1.0.26"
async-trait = "0.1.48"
log = "0.4.14"
protobuf = "2.23.0"
serde = { version = "^1.0", features = ["derive"] }
serde_json = ">=1.0.9"
slog = "2.5.2"
slog-scope = "4.4.0"
ttrpc = { version = "0.6.0" }
tokio = { version = "1.8.0", features = ["fs", "rt"] }
url = "2.2.2"
kata-types = { path = "../../../libs/kata-types"}
oci = { path = "../../../libs/oci" }
protocols = { path = "../../../libs/protocols", features=["async"] }
[features]
default = []

View File

@@ -0,0 +1,110 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use anyhow::{Context, Result};
use async_trait::async_trait;
use ttrpc::context as ttrpc_ctx;
use crate::{kata::KataAgent, Agent, AgentManager, HealthService};
/// millisecond to nanosecond
const MILLISECOND_TO_NANOSECOND: i64 = 1_000_000;
/// new ttrpc context with timeout
fn new_ttrpc_ctx(timeout: i64) -> ttrpc_ctx::Context {
ttrpc_ctx::with_timeout(timeout)
}
#[async_trait]
impl AgentManager for KataAgent {
async fn set_socket_address(&self, address: &str) -> Result<()> {
let mut inner = self.inner.lock().await;
inner.socket_address = address.to_string();
Ok(())
}
async fn start(&self) -> Result<()> {
info!(sl!(), "begin to connect agent");
self.connect_agent_server()
.await
.context("connect agent server")?;
self.start_log_forwarder()
.await
.context("connect log forwarder")?;
Ok(())
}
async fn stop(&self) {
self.stop_log_forwarder().await;
}
}
// implement for health service
macro_rules! impl_health_service {
($($name: tt | $req: ty | $resp: ty),*) => {
#[async_trait]
impl HealthService for KataAgent {
$(async fn $name(&self, req: $req) -> Result<$resp> {
let r = req.into();
let (mut client, timeout, _) = self.get_health_client().await.context("get health client")?;
let resp = client.$name(new_ttrpc_ctx(timeout * MILLISECOND_TO_NANOSECOND), &r).await?;
Ok(resp.into())
})*
}
};
}
impl_health_service!(
check | crate::CheckRequest | crate::HealthCheckResponse,
version | crate::CheckRequest | crate::VersionCheckResponse
);
macro_rules! impl_agent {
($($name: tt | $req: ty | $resp: ty | $new_timeout: expr),*) => {
#[async_trait]
impl Agent for KataAgent {
$(async fn $name(&self, req: $req) -> Result<$resp> {
let r = req.into();
let (mut client, mut timeout, _) = self.get_agent_client().await.context("get client")?;
// update new timeout
if let Some(v) = $new_timeout {
timeout = v;
}
let resp = client.$name(new_ttrpc_ctx(timeout * MILLISECOND_TO_NANOSECOND), &r).await?;
Ok(resp.into())
})*
}
};
}
impl_agent!(
create_container | crate::CreateContainerRequest | crate::Empty | None,
start_container | crate::ContainerID | crate::Empty | None,
remove_container | crate::RemoveContainerRequest | crate::Empty | None,
exec_process | crate::ExecProcessRequest | crate::Empty | None,
signal_process | crate::SignalProcessRequest | crate::Empty | None,
wait_process | crate::WaitProcessRequest | crate::WaitProcessResponse | Some(0),
update_container | crate::UpdateContainerRequest | crate::Empty | None,
stats_container | crate::ContainerID | crate::StatsContainerResponse | None,
pause_container | crate::ContainerID | crate::Empty | None,
resume_container | crate::ContainerID | crate::Empty | None,
write_stdin | crate::WriteStreamRequest | crate::WriteStreamResponse | None,
read_stdout | crate::ReadStreamRequest | crate::ReadStreamResponse | None,
read_stderr | crate::ReadStreamRequest | crate::ReadStreamResponse | None,
close_stdin | crate::CloseStdinRequest | crate::Empty | None,
tty_win_resize | crate::TtyWinResizeRequest | crate::Empty | None,
update_interface | crate::UpdateInterfaceRequest | crate::Interface | None,
update_routes | crate::UpdateRoutesRequest | crate::Routes | None,
add_arp_neighbors | crate::AddArpNeighborRequest | crate::Empty | None,
list_interfaces | crate::Empty | crate::Interfaces | None,
list_routes | crate::Empty | crate::Routes | None,
create_sandbox | crate::CreateSandboxRequest | crate::Empty | None,
destroy_sandbox | crate::Empty | crate::Empty | None,
copy_file | crate::CopyFileRequest | crate::Empty | None,
get_oom_event | crate::Empty | crate::OomEventResponse | Some(0)
);

View File

@@ -0,0 +1,123 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
mod agent;
mod trans;
use std::os::unix::io::{IntoRawFd, RawFd};
use anyhow::{Context, Result};
use kata_types::config::Agent as AgentConfig;
use protocols::{agent_ttrpc_async as agent_ttrpc, health_ttrpc_async as health_ttrpc};
use tokio::sync::Mutex;
use ttrpc::asynchronous::Client;
use crate::{log_forwarder::LogForwarder, sock};
// https://github.com/firecracker-microvm/firecracker/blob/master/docs/vsock.md
#[derive(Debug, Default)]
pub struct Vsock {
pub context_id: u64,
pub port: u32,
}
pub(crate) struct KataAgentInner {
/// TTRPC client
pub client: Option<Client>,
/// Client fd
pub client_fd: RawFd,
/// Unix domain socket address
pub socket_address: String,
/// Agent config
config: AgentConfig,
/// Log forwarder
log_forwarder: LogForwarder,
}
unsafe impl Send for KataAgent {}
unsafe impl Sync for KataAgent {}
pub struct KataAgent {
pub(crate) inner: Mutex<KataAgentInner>,
}
impl KataAgent {
pub fn new(config: AgentConfig) -> Self {
KataAgent {
inner: Mutex::new(KataAgentInner {
client: None,
client_fd: -1,
socket_address: "".to_string(),
config,
log_forwarder: LogForwarder::new(),
}),
}
}
pub async fn get_health_client(&self) -> Option<(health_ttrpc::HealthClient, i64, RawFd)> {
let inner = self.inner.lock().await;
inner.client.as_ref().map(|c| {
(
health_ttrpc::HealthClient::new(c.clone()),
inner.config.health_check_request_timeout_ms as i64,
inner.client_fd,
)
})
}
pub async fn get_agent_client(&self) -> Option<(agent_ttrpc::AgentServiceClient, i64, RawFd)> {
let inner = self.inner.lock().await;
inner.client.as_ref().map(|c| {
(
agent_ttrpc::AgentServiceClient::new(c.clone()),
inner.config.request_timeout_ms as i64,
inner.client_fd,
)
})
}
pub(crate) async fn connect_agent_server(&self) -> Result<()> {
let mut inner = self.inner.lock().await;
let config = sock::ConnectConfig::new(
inner.config.dial_timeout_ms as u64,
inner.config.reconnect_timeout_ms as u64,
);
let sock =
sock::new(&inner.socket_address, inner.config.server_port).context("new sock")?;
let stream = sock.connect(&config).await.context("connect")?;
let fd = stream.into_raw_fd();
info!(sl!(), "get stream raw fd {:?}", fd);
let c = Client::new(fd);
inner.client = Some(c);
inner.client_fd = fd;
Ok(())
}
pub(crate) async fn start_log_forwarder(&self) -> Result<()> {
let mut inner = self.inner.lock().await;
let config = sock::ConnectConfig::new(
inner.config.dial_timeout_ms as u64,
inner.config.reconnect_timeout_ms as u64,
);
let address = inner.socket_address.clone();
let port = inner.config.log_port;
inner
.log_forwarder
.start(&address, port, config)
.await
.context("start log forwarder")?;
Ok(())
}
pub(crate) async fn stop_log_forwarder(&self) {
let mut inner = self.inner.lock().await;
inner.log_forwarder.stop();
}
}

View File

@@ -0,0 +1,794 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::convert::Into;
use protocols::{
agent::{self, OOMEvent},
empty, health, types,
};
use crate::{
types::{
ARPNeighbor, ARPNeighbors, AddArpNeighborRequest, AgentDetails, BlkioStats,
BlkioStatsEntry, CgroupStats, CheckRequest, CloseStdinRequest, ContainerID,
CopyFileRequest, CpuStats, CpuUsage, CreateContainerRequest, CreateSandboxRequest, Device,
Empty, ExecProcessRequest, GuestDetailsResponse, HealthCheckResponse, HugetlbStats,
IPAddress, IPFamily, Interface, Interfaces, KernelModule, MemHotplugByProbeRequest,
MemoryData, MemoryStats, NetworkStats, OnlineCPUMemRequest, PidsStats, ReadStreamRequest,
ReadStreamResponse, RemoveContainerRequest, ReseedRandomDevRequest, Route, Routes,
SetGuestDateTimeRequest, SignalProcessRequest, StatsContainerResponse, Storage, StringUser,
ThrottlingData, TtyWinResizeRequest, UpdateContainerRequest, UpdateInterfaceRequest,
UpdateRoutesRequest, VersionCheckResponse, WaitProcessRequest, WriteStreamRequest,
},
OomEventResponse, WaitProcessResponse, WriteStreamResponse,
};
fn from_vec<F: Into<T>, T: Sized>(from: Vec<F>) -> ::protobuf::RepeatedField<T> {
let mut to: Vec<T> = vec![];
for data in from {
to.push(data.into());
}
::protobuf::RepeatedField::from_vec(to)
}
fn into_vec<F: Sized + Clone, T: From<F>>(from: ::protobuf::RepeatedField<F>) -> Vec<T> {
let mut to: Vec<T> = vec![];
for data in from.to_vec() {
to.push(data.into());
}
to
}
fn from_option<F: Sized, T: From<F>>(from: Option<F>) -> ::protobuf::SingularPtrField<T> {
match from {
Some(f) => ::protobuf::SingularPtrField::from_option(Some(T::from(f))),
None => ::protobuf::SingularPtrField::none(),
}
}
fn into_option<F: Into<T>, T: Sized>(from: ::protobuf::SingularPtrField<F>) -> Option<T> {
from.into_option().map(|f| f.into())
}
fn into_hash_map<F: Into<T>, T>(
from: std::collections::HashMap<String, F>,
) -> std::collections::HashMap<String, T> {
let mut to: std::collections::HashMap<String, T> = Default::default();
for (key, value) in from {
to.insert(key, value.into());
}
to
}
impl From<empty::Empty> for Empty {
fn from(_: empty::Empty) -> Self {
Self {}
}
}
impl From<StringUser> for agent::StringUser {
fn from(from: StringUser) -> Self {
Self {
uid: from.uid,
gid: from.gid,
additionalGids: ::protobuf::RepeatedField::from_vec(from.additional_gids),
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<Device> for agent::Device {
fn from(from: Device) -> Self {
Self {
id: from.id,
field_type: from.field_type,
vm_path: from.vm_path,
container_path: from.container_path,
options: from_vec(from.options),
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<Storage> for agent::Storage {
fn from(from: Storage) -> Self {
Self {
driver: from.driver,
driver_options: from_vec(from.driver_options),
source: from.source,
fstype: from.fs_type,
options: from_vec(from.options),
mount_point: from.mount_point,
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<KernelModule> for agent::KernelModule {
fn from(from: KernelModule) -> Self {
Self {
name: from.name,
parameters: from_vec(from.parameters),
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<IPFamily> for types::IPFamily {
fn from(from: IPFamily) -> Self {
if from == IPFamily::V4 {
types::IPFamily::v4
} else {
types::IPFamily::v6
}
}
}
impl From<types::IPFamily> for IPFamily {
fn from(src: types::IPFamily) -> Self {
match src {
types::IPFamily::v4 => IPFamily::V4,
types::IPFamily::v6 => IPFamily::V6,
}
}
}
impl From<IPAddress> for types::IPAddress {
fn from(from: IPAddress) -> Self {
Self {
family: from.family.into(),
address: from.address,
mask: from.mask,
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<types::IPAddress> for IPAddress {
fn from(src: types::IPAddress) -> Self {
Self {
family: src.family.into(),
address: "".to_string(),
mask: "".to_string(),
}
}
}
impl From<Interface> for types::Interface {
fn from(from: Interface) -> Self {
Self {
device: from.device,
name: from.name,
IPAddresses: from_vec(from.ip_addresses),
mtu: from.mtu,
hwAddr: from.hw_addr,
pciPath: from.pci_addr,
field_type: from.field_type,
raw_flags: from.raw_flags,
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<types::Interface> for Interface {
fn from(src: types::Interface) -> Self {
Self {
device: src.device,
name: src.name,
ip_addresses: into_vec(src.IPAddresses),
mtu: src.mtu,
hw_addr: src.hwAddr,
pci_addr: src.pciPath,
field_type: src.field_type,
raw_flags: src.raw_flags,
}
}
}
impl From<agent::Interfaces> for Interfaces {
fn from(src: agent::Interfaces) -> Self {
Self {
interfaces: into_vec(src.Interfaces),
}
}
}
impl From<Route> for types::Route {
fn from(from: Route) -> Self {
Self {
dest: from.dest,
gateway: from.gateway,
device: from.device,
source: from.source,
scope: from.scope,
family: from.family.into(),
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<types::Route> for Route {
fn from(src: types::Route) -> Self {
Self {
dest: src.dest,
gateway: src.gateway,
device: src.device,
source: src.source,
scope: src.scope,
family: src.family.into(),
}
}
}
impl From<Routes> for agent::Routes {
fn from(from: Routes) -> Self {
Self {
Routes: from_vec(from.routes),
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<agent::Routes> for Routes {
fn from(src: agent::Routes) -> Self {
Self {
routes: into_vec(src.Routes),
}
}
}
impl From<CreateContainerRequest> for agent::CreateContainerRequest {
fn from(from: CreateContainerRequest) -> Self {
Self {
container_id: from.container_id,
exec_id: from.exec_id,
string_user: from_option(from.string_user),
devices: from_vec(from.devices),
storages: from_vec(from.storages),
OCI: from_option(from.oci),
sandbox_pidns: from.sandbox_pidns,
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<RemoveContainerRequest> for agent::RemoveContainerRequest {
fn from(from: RemoveContainerRequest) -> Self {
Self {
container_id: from.container_id,
timeout: from.timeout,
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<ContainerID> for agent::StartContainerRequest {
fn from(from: ContainerID) -> Self {
Self {
container_id: from.container_id,
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<ContainerID> for agent::StatsContainerRequest {
fn from(from: ContainerID) -> Self {
Self {
container_id: from.container_id,
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<ContainerID> for agent::PauseContainerRequest {
fn from(from: ContainerID) -> Self {
Self {
container_id: from.container_id,
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<ContainerID> for agent::ResumeContainerRequest {
fn from(from: ContainerID) -> Self {
Self {
container_id: from.container_id,
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<SignalProcessRequest> for agent::SignalProcessRequest {
fn from(from: SignalProcessRequest) -> Self {
Self {
container_id: from.container_id,
exec_id: from.exec_id,
signal: from.signal,
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<WaitProcessRequest> for agent::WaitProcessRequest {
fn from(from: WaitProcessRequest) -> Self {
Self {
container_id: from.container_id,
exec_id: from.exec_id,
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<UpdateContainerRequest> for agent::UpdateContainerRequest {
fn from(from: UpdateContainerRequest) -> Self {
Self {
container_id: from.container_id,
resources: from_option(Some(from.resources)),
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<WriteStreamRequest> for agent::WriteStreamRequest {
fn from(from: WriteStreamRequest) -> Self {
Self {
container_id: from.container_id,
exec_id: from.exec_id,
data: from.data,
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<agent::WriteStreamResponse> for WriteStreamResponse {
fn from(from: agent::WriteStreamResponse) -> Self {
Self { length: from.len }
}
}
impl From<ExecProcessRequest> for agent::ExecProcessRequest {
fn from(from: ExecProcessRequest) -> Self {
Self {
container_id: from.container_id,
exec_id: from.exec_id,
string_user: from_option(from.string_user),
process: from_option(from.process),
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<agent::CpuUsage> for CpuUsage {
fn from(src: agent::CpuUsage) -> Self {
Self {
total_usage: src.total_usage,
percpu_usage: src.percpu_usage,
usage_in_kernelmode: src.usage_in_kernelmode,
usage_in_usermode: src.usage_in_usermode,
}
}
}
impl From<agent::ThrottlingData> for ThrottlingData {
fn from(src: agent::ThrottlingData) -> Self {
Self {
periods: src.periods,
throttled_periods: src.throttled_periods,
throttled_time: src.throttled_time,
}
}
}
impl From<agent::CpuStats> for CpuStats {
fn from(src: agent::CpuStats) -> Self {
Self {
cpu_usage: into_option(src.cpu_usage),
throttling_data: into_option(src.throttling_data),
}
}
}
impl From<agent::MemoryData> for MemoryData {
fn from(src: agent::MemoryData) -> Self {
Self {
usage: src.usage,
max_usage: src.max_usage,
failcnt: src.failcnt,
limit: src.limit,
}
}
}
impl From<agent::MemoryStats> for MemoryStats {
fn from(src: agent::MemoryStats) -> Self {
Self {
cache: src.cache,
usage: into_option(src.usage),
swap_usage: into_option(src.swap_usage),
kernel_usage: into_option(src.kernel_usage),
use_hierarchy: src.use_hierarchy,
stats: into_hash_map(src.stats),
}
}
}
impl From<agent::PidsStats> for PidsStats {
fn from(src: agent::PidsStats) -> Self {
Self {
current: src.current,
limit: src.limit,
}
}
}
impl From<agent::BlkioStatsEntry> for BlkioStatsEntry {
fn from(src: agent::BlkioStatsEntry) -> Self {
Self {
major: src.major,
minor: src.minor,
op: src.op,
value: src.value,
}
}
}
impl From<agent::BlkioStats> for BlkioStats {
fn from(src: agent::BlkioStats) -> Self {
Self {
io_service_bytes_recursive: into_vec(src.io_service_bytes_recursive),
io_serviced_recursive: into_vec(src.io_serviced_recursive),
io_queued_recursive: into_vec(src.io_queued_recursive),
io_service_time_recursive: into_vec(src.io_service_time_recursive),
io_wait_time_recursive: into_vec(src.io_wait_time_recursive),
io_merged_recursive: into_vec(src.io_merged_recursive),
io_time_recursive: into_vec(src.io_time_recursive),
sectors_recursive: into_vec(src.sectors_recursive),
}
}
}
impl From<agent::HugetlbStats> for HugetlbStats {
fn from(src: agent::HugetlbStats) -> Self {
Self {
usage: src.usage,
max_usage: src.max_usage,
failcnt: src.failcnt,
}
}
}
impl From<agent::CgroupStats> for CgroupStats {
fn from(src: agent::CgroupStats) -> Self {
Self {
cpu_stats: into_option(src.cpu_stats),
memory_stats: into_option(src.memory_stats),
pids_stats: into_option(src.pids_stats),
blkio_stats: into_option(src.blkio_stats),
hugetlb_stats: into_hash_map(src.hugetlb_stats),
}
}
}
impl From<agent::NetworkStats> for NetworkStats {
fn from(src: agent::NetworkStats) -> Self {
Self {
name: src.name,
rx_bytes: src.rx_bytes,
rx_packets: src.rx_packets,
rx_errors: src.rx_errors,
rx_dropped: src.rx_dropped,
tx_bytes: src.tx_bytes,
tx_packets: src.tx_packets,
tx_errors: src.tx_errors,
tx_dropped: src.tx_dropped,
}
}
}
// translate ttrpc::agent response to interface::agent response
impl From<agent::StatsContainerResponse> for StatsContainerResponse {
fn from(src: agent::StatsContainerResponse) -> Self {
Self {
cgroup_stats: into_option(src.cgroup_stats),
network_stats: into_vec(src.network_stats),
}
}
}
impl From<ReadStreamRequest> for agent::ReadStreamRequest {
fn from(from: ReadStreamRequest) -> Self {
Self {
container_id: from.container_id,
exec_id: from.exec_id,
len: from.len,
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<agent::ReadStreamResponse> for ReadStreamResponse {
fn from(from: agent::ReadStreamResponse) -> Self {
Self { data: from.data }
}
}
impl From<CloseStdinRequest> for agent::CloseStdinRequest {
fn from(from: CloseStdinRequest) -> Self {
Self {
container_id: from.container_id,
exec_id: from.exec_id,
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<TtyWinResizeRequest> for agent::TtyWinResizeRequest {
fn from(from: TtyWinResizeRequest) -> Self {
Self {
container_id: from.container_id,
exec_id: from.exec_id,
row: from.row,
column: from.column,
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<UpdateInterfaceRequest> for agent::UpdateInterfaceRequest {
fn from(from: UpdateInterfaceRequest) -> Self {
Self {
interface: from_option(from.interface),
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<Empty> for agent::ListInterfacesRequest {
fn from(_: Empty) -> Self {
Self {
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<UpdateRoutesRequest> for agent::UpdateRoutesRequest {
fn from(from: UpdateRoutesRequest) -> Self {
Self {
routes: from_option(from.route),
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<Empty> for agent::ListRoutesRequest {
fn from(_: Empty) -> Self {
Self {
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<ARPNeighbor> for types::ARPNeighbor {
fn from(from: ARPNeighbor) -> Self {
Self {
toIPAddress: from_option(from.to_ip_address),
device: from.device,
lladdr: from.ll_addr,
state: from.state,
flags: from.flags,
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<ARPNeighbors> for agent::ARPNeighbors {
fn from(from: ARPNeighbors) -> Self {
Self {
ARPNeighbors: from_vec(from.neighbors),
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<AddArpNeighborRequest> for agent::AddARPNeighborsRequest {
fn from(from: AddArpNeighborRequest) -> Self {
Self {
neighbors: from_option(from.neighbors),
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<CreateSandboxRequest> for agent::CreateSandboxRequest {
fn from(from: CreateSandboxRequest) -> Self {
Self {
hostname: from.hostname,
dns: from_vec(from.dns),
storages: from_vec(from.storages),
sandbox_pidns: from.sandbox_pidns,
sandbox_id: from.sandbox_id,
guest_hook_path: from.guest_hook_path,
kernel_modules: from_vec(from.kernel_modules),
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<Empty> for agent::DestroySandboxRequest {
fn from(_: Empty) -> Self {
Self {
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<OnlineCPUMemRequest> for agent::OnlineCPUMemRequest {
fn from(from: OnlineCPUMemRequest) -> Self {
Self {
wait: from.wait,
nb_cpus: from.nb_cpus,
cpu_only: from.cpu_only,
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<ReseedRandomDevRequest> for agent::ReseedRandomDevRequest {
fn from(from: ReseedRandomDevRequest) -> Self {
Self {
data: from.data,
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<MemHotplugByProbeRequest> for agent::MemHotplugByProbeRequest {
fn from(from: MemHotplugByProbeRequest) -> Self {
Self {
memHotplugProbeAddr: from.mem_hotplug_probe_addr,
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<SetGuestDateTimeRequest> for agent::SetGuestDateTimeRequest {
fn from(from: SetGuestDateTimeRequest) -> Self {
Self {
Sec: from.sec,
Usec: from.usec,
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<agent::AgentDetails> for AgentDetails {
fn from(src: agent::AgentDetails) -> Self {
Self {
version: src.version,
init_daemon: src.init_daemon,
device_handlers: into_vec(src.device_handlers),
storage_handlers: into_vec(src.storage_handlers),
supports_seccomp: src.supports_seccomp,
}
}
}
impl From<agent::GuestDetailsResponse> for GuestDetailsResponse {
fn from(src: agent::GuestDetailsResponse) -> Self {
Self {
mem_block_size_bytes: src.mem_block_size_bytes,
agent_details: into_option(src.agent_details),
support_mem_hotplug_probe: src.support_mem_hotplug_probe,
}
}
}
impl From<CopyFileRequest> for agent::CopyFileRequest {
fn from(from: CopyFileRequest) -> Self {
Self {
path: from.path,
file_size: from.file_size,
file_mode: from.file_mode,
dir_mode: from.dir_mode,
uid: from.uid,
gid: from.gid,
offset: from.offset,
data: from.data,
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<agent::WaitProcessResponse> for WaitProcessResponse {
fn from(from: agent::WaitProcessResponse) -> Self {
Self {
status: from.status,
}
}
}
impl From<Empty> for agent::GetOOMEventRequest {
fn from(_: Empty) -> Self {
Self {
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<CheckRequest> for health::CheckRequest {
fn from(from: CheckRequest) -> Self {
Self {
service: from.service,
unknown_fields: Default::default(),
cached_size: Default::default(),
}
}
}
impl From<health::HealthCheckResponse> for HealthCheckResponse {
fn from(from: health::HealthCheckResponse) -> Self {
Self {
status: from.status as u32,
}
}
}
impl From<health::VersionCheckResponse> for VersionCheckResponse {
fn from(from: health::VersionCheckResponse) -> Self {
Self {
grpc_version: from.grpc_version,
agent_version: from.agent_version,
}
}
}
impl From<agent::OOMEvent> for OomEventResponse {
fn from(from: OOMEvent) -> Self {
Self {
container_id: from.container_id,
}
}
}

View File

@@ -0,0 +1,84 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
#[macro_use]
extern crate slog;
macro_rules! sl {
() => {
slog_scope::logger().new(slog::o!("subsystem" => "agent"))
};
}
pub mod kata;
mod log_forwarder;
mod sock;
mod types;
pub use types::{
ARPNeighbor, ARPNeighbors, AddArpNeighborRequest, BlkioStatsEntry, CheckRequest,
CloseStdinRequest, ContainerID, CopyFileRequest, CreateContainerRequest, CreateSandboxRequest,
Empty, ExecProcessRequest, GetGuestDetailsRequest, GuestDetailsResponse, HealthCheckResponse,
IPAddress, IPFamily, Interface, Interfaces, ListProcessesRequest, MemHotplugByProbeRequest,
OnlineCPUMemRequest, OomEventResponse, ReadStreamRequest, ReadStreamResponse,
RemoveContainerRequest, ReseedRandomDevRequest, Route, Routes, SetGuestDateTimeRequest,
SignalProcessRequest, StatsContainerResponse, Storage, TtyWinResizeRequest,
UpdateContainerRequest, UpdateInterfaceRequest, UpdateRoutesRequest, VersionCheckResponse,
WaitProcessRequest, WaitProcessResponse, WriteStreamRequest, WriteStreamResponse,
};
use anyhow::Result;
use async_trait::async_trait;
#[async_trait]
pub trait AgentManager: Send + Sync {
async fn set_socket_address(&self, address: &str) -> Result<()>;
async fn start(&self) -> Result<()>;
async fn stop(&self);
}
#[async_trait]
pub trait HealthService: Send + Sync {
async fn check(&self, req: CheckRequest) -> Result<HealthCheckResponse>;
async fn version(&self, req: CheckRequest) -> Result<VersionCheckResponse>;
}
#[async_trait]
pub trait Agent: AgentManager + HealthService + Send + Sync {
// sandbox
async fn create_sandbox(&self, req: CreateSandboxRequest) -> Result<Empty>;
async fn destroy_sandbox(&self, req: Empty) -> Result<Empty>;
// network
async fn add_arp_neighbors(&self, req: AddArpNeighborRequest) -> Result<Empty>;
async fn list_interfaces(&self, req: Empty) -> Result<Interfaces>;
async fn list_routes(&self, req: Empty) -> Result<Routes>;
async fn update_interface(&self, req: UpdateInterfaceRequest) -> Result<Interface>;
async fn update_routes(&self, req: UpdateRoutesRequest) -> Result<Routes>;
// container
async fn create_container(&self, req: CreateContainerRequest) -> Result<Empty>;
async fn pause_container(&self, req: ContainerID) -> Result<Empty>;
async fn remove_container(&self, req: RemoveContainerRequest) -> Result<Empty>;
async fn resume_container(&self, req: ContainerID) -> Result<Empty>;
async fn start_container(&self, req: ContainerID) -> Result<Empty>;
async fn stats_container(&self, req: ContainerID) -> Result<StatsContainerResponse>;
async fn update_container(&self, req: UpdateContainerRequest) -> Result<Empty>;
// process
async fn exec_process(&self, req: ExecProcessRequest) -> Result<Empty>;
async fn signal_process(&self, req: SignalProcessRequest) -> Result<Empty>;
async fn wait_process(&self, req: WaitProcessRequest) -> Result<WaitProcessResponse>;
// io and tty
async fn close_stdin(&self, req: CloseStdinRequest) -> Result<Empty>;
async fn read_stderr(&self, req: ReadStreamRequest) -> Result<ReadStreamResponse>;
async fn read_stdout(&self, req: ReadStreamRequest) -> Result<ReadStreamResponse>;
async fn tty_win_resize(&self, req: TtyWinResizeRequest) -> Result<Empty>;
async fn write_stdin(&self, req: WriteStreamRequest) -> Result<WriteStreamResponse>;
// utils
async fn copy_file(&self, req: CopyFileRequest) -> Result<Empty>;
async fn get_oom_event(&self, req: Empty) -> Result<OomEventResponse>;
}

View File

@@ -0,0 +1,159 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use anyhow::Result;
use tokio::io::{AsyncBufReadExt, BufReader};
use crate::sock;
// https://github.com/slog-rs/slog/blob/master/src/lib.rs#L2082
const LOG_LEVEL_TRACE: &str = "TRCE";
const LOG_LEVEL_DEBUG: &str = "DEBG";
const LOG_LEVEL_INFO: &str = "INFO";
const LOG_LEVEL_WARNING: &str = "WARN";
const LOG_LEVEL_ERROR: &str = "ERRO";
const LOG_LEVEL_CRITICAL: &str = "CRIT";
pub(crate) struct LogForwarder {
task_handler: Option<tokio::task::JoinHandle<()>>,
}
impl LogForwarder {
pub(crate) fn new() -> Self {
Self { task_handler: None }
}
pub(crate) fn stop(&mut self) {
let task_handler = self.task_handler.take();
if let Some(handler) = task_handler {
handler.abort();
info!(sl!(), "abort log forwarder thread");
}
}
// start connect kata-agent log vsock and copy data to hypervisor's log stream
pub(crate) async fn start(
&mut self,
address: &str,
port: u32,
config: sock::ConnectConfig,
) -> Result<()> {
let logger = sl!().clone();
let address = address.to_string();
let task_handler = tokio::spawn(async move {
loop {
info!(logger, "try to connect to get agent log");
let sock = match sock::new(&address, port) {
Ok(sock) => sock,
Err(err) => {
error!(
sl!(),
"failed to new sock for address {:?} port {} error {:?}",
address,
port,
err
);
return;
}
};
match sock.connect(&config).await {
Ok(stream) => {
let stream = BufReader::new(stream);
let mut lines = stream.lines();
while let Ok(line) = lines.next_line().await {
if let Some(l) = line {
match parse_agent_log_level(&l) {
LOG_LEVEL_TRACE => trace!(sl!(), "{}", l),
LOG_LEVEL_DEBUG => debug!(sl!(), "{}", l),
LOG_LEVEL_WARNING => warn!(sl!(), "{}", l),
LOG_LEVEL_ERROR => error!(sl!(), "{}", l),
LOG_LEVEL_CRITICAL => crit!(sl!(), "{}", l),
_ => info!(sl!(), "{}", l),
}
}
}
}
Err(err) => {
warn!(logger, "connect agent vsock failed: {:?}", err);
}
}
}
});
self.task_handler = Some(task_handler);
Ok(())
}
}
pub fn parse_agent_log_level(s: &str) -> &str {
let v: serde_json::Result<serde_json::Value> = serde_json::from_str(s);
match v {
Err(_err) => LOG_LEVEL_INFO,
Ok(val) => {
match &val["level"] {
serde_json::Value::String(s) => match s.as_str() {
LOG_LEVEL_TRACE => LOG_LEVEL_TRACE,
LOG_LEVEL_DEBUG => LOG_LEVEL_DEBUG,
LOG_LEVEL_WARNING => LOG_LEVEL_WARNING,
LOG_LEVEL_ERROR => LOG_LEVEL_ERROR,
LOG_LEVEL_CRITICAL => LOG_LEVEL_CRITICAL,
_ => LOG_LEVEL_INFO, // info or other values will return info,
},
_ => LOG_LEVEL_INFO, // info or other values will return info,
}
}
}
}
#[cfg(test)]
mod tests {
use super::parse_agent_log_level;
#[test]
fn test_parse_agent_log_level() {
let cases = vec![
// normal cases
(
r#"{"msg":"child exited unexpectedly","level":"TRCE"}"#,
super::LOG_LEVEL_TRACE,
),
(
r#"{"msg":"child exited unexpectedly","level":"DEBG"}"#,
super::LOG_LEVEL_DEBUG,
),
(
r#"{"msg":"child exited unexpectedly","level":"INFO"}"#,
super::LOG_LEVEL_INFO,
),
(
r#"{"msg":"child exited unexpectedly","level":"WARN"}"#,
super::LOG_LEVEL_WARNING,
),
(
r#"{"msg":"child exited unexpectedly","level":"ERRO"}"#,
super::LOG_LEVEL_ERROR,
),
(
r#"{"msg":"child exited unexpectedly","level":"CRIT"}"#,
super::LOG_LEVEL_CRITICAL,
),
(
r#"{"msg":"child exited unexpectedly","level":"abc"}"#,
super::LOG_LEVEL_INFO,
),
// exception cases
(r#"{"not a valid json struct"}"#, super::LOG_LEVEL_INFO),
("not a valid json struct", super::LOG_LEVEL_INFO),
];
for case in cases.iter() {
let s = case.0;
let result = parse_agent_log_level(s);
let excepted = case.1;
assert_eq!(result, excepted);
}
}
}

View File

@@ -0,0 +1,81 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use std::os::unix::prelude::AsRawFd;
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use tokio::{
io::{AsyncBufReadExt, AsyncWriteExt, BufReader},
net::UnixStream,
};
use super::{ConnectConfig, Sock, Stream};
unsafe impl Send for HybridVsock {}
unsafe impl Sync for HybridVsock {}
#[derive(Debug, PartialEq)]
pub struct HybridVsock {
uds: String,
port: u32,
}
impl HybridVsock {
pub fn new(uds: &str, port: u32) -> Self {
Self {
uds: uds.to_string(),
port,
}
}
}
#[async_trait]
impl Sock for HybridVsock {
async fn connect(&self, config: &ConnectConfig) -> Result<Stream> {
let retry_times = config.reconnect_timeout_ms / config.dial_timeout_ms;
for i in 0..retry_times {
match connect_helper(&self.uds, self.port).await {
Ok(stream) => {
info!(
sl!(),
"connect success on {} current client fd {}",
i,
stream.as_raw_fd()
);
return Ok(Stream::Unix(stream));
}
Err(err) => {
debug!(sl!(), "connect on {} err : {:?}", i, err);
tokio::time::sleep(std::time::Duration::from_millis(config.dial_timeout_ms))
.await;
continue;
}
}
}
Err(anyhow!("cannot connect to agent ttrpc server"))
}
}
async fn connect_helper(uds: &str, port: u32) -> Result<UnixStream> {
info!(sl!(), "connect uds {:?} port {}", &uds, port);
let mut stream = UnixStream::connect(&uds).await.context("connect")?;
stream
.write_all(format!("connect {}\n", port).as_bytes())
.await
.context("write all")?;
let mut reads = BufReader::new(&mut stream);
let mut response = String::new();
reads.read_line(&mut response).await.context("read line")?;
//info!(sl!(), "get socket resp: {}", response);
if !response.contains("OK") {
return Err(anyhow!(
"handshake error: malformed response code: {:?}",
response
));
}
Ok(stream)
}

View File

@@ -0,0 +1,159 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
mod hybrid_vsock;
pub use hybrid_vsock::HybridVsock;
mod vsock;
pub use vsock::Vsock;
use std::{
pin::Pin,
task::{Context as TaskContext, Poll},
{
os::unix::{io::IntoRawFd, prelude::RawFd},
sync::Arc,
},
};
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use tokio::{
io::{AsyncRead, ReadBuf},
net::UnixStream,
};
use url::Url;
const VSOCK_SCHEME: &str = "vsock";
const HYBRID_VSOCK_SCHEME: &str = "hvsock";
/// Socket stream
pub enum Stream {
// hvsock://<path>:<port>. Firecracker/Dragonball implements the virtio-vsock device
// model, and mediates communication between AF_UNIX sockets (on the host end)
// and AF_VSOCK sockets (on the guest end).
Unix(UnixStream),
// TODO: support vsock
// vsock://<cid>:<port>
}
impl Stream {
fn poll_read_priv(
&mut self,
cx: &mut TaskContext<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
// Safety: `UnixStream::read` correctly handles reads into uninitialized memory
match self {
Stream::Unix(stream) => Pin::new(stream).poll_read(cx, buf),
}
}
}
impl IntoRawFd for Stream {
fn into_raw_fd(self) -> RawFd {
match self {
Stream::Unix(stream) => match stream.into_std() {
Ok(stream) => stream.into_raw_fd(),
Err(err) => {
error!(sl!(), "failed to into std unix stream {:?}", err);
-1
}
},
}
}
}
impl AsyncRead for Stream {
fn poll_read(
self: Pin<&mut Self>,
cx: &mut TaskContext<'_>,
buf: &mut ReadBuf<'_>,
) -> Poll<std::io::Result<()>> {
// we know this is safe because doesn't moved
let me = unsafe { self.get_unchecked_mut() };
me.poll_read_priv(cx, buf)
}
}
/// Connect config
pub struct ConnectConfig {
dial_timeout_ms: u64,
reconnect_timeout_ms: u64,
}
impl ConnectConfig {
pub fn new(dial_timeout_ms: u64, reconnect_timeout_ms: u64) -> Self {
Self {
dial_timeout_ms,
reconnect_timeout_ms,
}
}
}
#[derive(Debug, PartialEq)]
enum SockType {
Vsock(Vsock),
HybridVsock(HybridVsock),
}
#[async_trait]
pub trait Sock: Send + Sync {
async fn connect(&self, config: &ConnectConfig) -> Result<Stream>;
}
// Supported sock address formats are:
// - vsock://<cid>:<port>
// - hvsock://<path>:<port>. Firecracker implements the virtio-vsock device
// model, and mediates communication between AF_UNIX sockets (on the host end)
// and AF_VSOCK sockets (on the guest end).
pub fn new(address: &str, port: u32) -> Result<Arc<dyn Sock>> {
match parse(address, port).context("parse url")? {
SockType::Vsock(sock) => Ok(Arc::new(sock)),
SockType::HybridVsock(sock) => Ok(Arc::new(sock)),
}
}
fn parse(address: &str, port: u32) -> Result<SockType> {
let url = Url::parse(address).context("parse url")?;
match url.scheme() {
VSOCK_SCHEME => {
let cid = url
.host_str()
.unwrap_or_default()
.parse::<u32>()
.context("parse cid")?;
Ok(SockType::Vsock(Vsock::new(cid, port)))
}
HYBRID_VSOCK_SCHEME => {
let path: Vec<&str> = url.path().split(':').collect();
if path.len() != 1 {
return Err(anyhow!("invalid path {:?}", path));
}
let uds = path[0];
Ok(SockType::HybridVsock(HybridVsock::new(uds, port)))
}
_ => Err(anyhow!("Unsupported scheme")),
}
}
#[cfg(test)]
mod test {
use super::{hybrid_vsock::HybridVsock, parse, vsock::Vsock, SockType};
#[test]
fn test_parse_url() {
// check vsock
let vsock = parse("vsock://123", 456).unwrap();
assert_eq!(vsock, SockType::Vsock(Vsock::new(123, 456)));
// check hybrid vsock
let hvsock = parse("hvsock:///tmp/test.hvsock", 456).unwrap();
assert_eq!(
hvsock,
SockType::HybridVsock(HybridVsock::new("/tmp/test.hvsock", 456))
);
}
}

View File

@@ -0,0 +1,32 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use anyhow::Result;
use async_trait::async_trait;
use super::{ConnectConfig, Sock, Stream};
unsafe impl Send for Vsock {}
unsafe impl Sync for Vsock {}
#[derive(Debug, PartialEq)]
pub struct Vsock {
cid: u32,
port: u32,
}
impl Vsock {
pub fn new(cid: u32, port: u32) -> Self {
Self { cid, port }
}
}
#[async_trait]
impl Sock for Vsock {
async fn connect(&self, _config: &ConnectConfig) -> Result<Stream> {
todo!()
}
}

View File

@@ -0,0 +1,454 @@
// Copyright (c) 2019-2022 Alibaba Cloud
// Copyright (c) 2019-2022 Ant Group
//
// SPDX-License-Identifier: Apache-2.0
//
use serde::Deserialize;
#[derive(PartialEq, Clone, Default)]
pub struct Empty {}
impl Empty {
pub fn new() -> Self {
Self::default()
}
}
#[derive(PartialEq, Clone, Default)]
pub struct StringUser {
pub uid: String,
pub gid: String,
pub additional_gids: Vec<String>,
}
#[derive(PartialEq, Clone, Default)]
pub struct Device {
pub id: String,
pub field_type: String,
pub vm_path: String,
pub container_path: String,
pub options: Vec<String>,
}
#[derive(Debug, PartialEq, Clone, Default)]
pub struct Storage {
pub driver: String,
pub driver_options: Vec<String>,
pub source: String,
pub fs_type: String,
pub options: Vec<String>,
pub mount_point: String,
}
#[derive(Deserialize, Clone, PartialEq, Eq, Debug, Hash)]
pub enum IPFamily {
V4 = 0,
V6 = 1,
}
impl ::std::default::Default for IPFamily {
fn default() -> Self {
IPFamily::V4
}
}
#[derive(Deserialize, Debug, PartialEq, Clone, Default)]
pub struct IPAddress {
pub family: IPFamily,
pub address: String,
pub mask: String,
}
#[derive(Deserialize, Debug, PartialEq, Clone, Default)]
pub struct Interface {
pub device: String,
pub name: String,
pub ip_addresses: Vec<IPAddress>,
pub mtu: u64,
pub hw_addr: String,
#[serde(default)]
pub pci_addr: String,
#[serde(default)]
pub field_type: String,
#[serde(default)]
pub raw_flags: u32,
}
#[derive(PartialEq, Clone, Default)]
pub struct Interfaces {
pub interfaces: Vec<Interface>,
}
#[derive(Deserialize, Debug, PartialEq, Clone, Default)]
pub struct Route {
pub dest: String,
pub gateway: String,
pub device: String,
pub source: String,
pub scope: u32,
pub family: IPFamily,
}
#[derive(Deserialize, Debug, PartialEq, Clone, Default)]
pub struct Routes {
pub routes: Vec<Route>,
}
#[derive(PartialEq, Clone, Default)]
pub struct CreateContainerRequest {
pub container_id: String,
pub exec_id: String,
pub string_user: Option<StringUser>,
pub devices: Vec<Device>,
pub storages: Vec<Storage>,
pub oci: Option<oci::Spec>,
pub guest_hooks: Option<oci::Hooks>,
pub sandbox_pidns: bool,
pub rootfs_mounts: Vec<oci::Mount>,
}
#[derive(PartialEq, Clone, Default)]
pub struct ContainerID {
pub container_id: String,
}
impl ContainerID {
pub fn new(id: &str) -> Self {
Self {
container_id: id.to_string(),
}
}
}
#[derive(PartialEq, Clone, Debug, Default)]
pub struct RemoveContainerRequest {
pub container_id: String,
pub timeout: u32,
}
impl RemoveContainerRequest {
pub fn new(id: &str, timeout: u32) -> Self {
Self {
container_id: id.to_string(),
timeout,
}
}
}
#[derive(PartialEq, Clone, Default)]
pub struct SignalProcessRequest {
pub container_id: String,
pub exec_id: String,
pub signal: u32,
}
#[derive(PartialEq, Clone, Default)]
pub struct WaitProcessRequest {
pub container_id: String,
pub exec_id: String,
}
#[derive(PartialEq, Clone, Default)]
pub struct ListProcessesRequest {
pub container_id: String,
pub format: String,
pub args: Vec<String>,
}
#[derive(PartialEq, Clone, Default)]
pub struct UpdateContainerRequest {
pub container_id: String,
pub resources: oci::LinuxResources,
pub mounts: Vec<oci::Mount>,
}
#[derive(PartialEq, Clone, Default)]
pub struct WriteStreamRequest {
pub container_id: String,
pub exec_id: String,
pub data: Vec<u8>,
}
#[derive(PartialEq, Clone, Default)]
pub struct WriteStreamResponse {
pub length: u32,
}
#[derive(PartialEq, Clone, Default)]
pub struct ExecProcessRequest {
pub container_id: String,
pub exec_id: String,
pub string_user: Option<StringUser>,
pub process: Option<oci::Process>,
}
#[derive(PartialEq, Clone, Default, Debug)]
pub struct CpuUsage {
pub total_usage: u64,
pub percpu_usage: ::std::vec::Vec<u64>,
pub usage_in_kernelmode: u64,
pub usage_in_usermode: u64,
}
#[derive(PartialEq, Clone, Default, Debug)]
pub struct ThrottlingData {
pub periods: u64,
pub throttled_periods: u64,
pub throttled_time: u64,
}
#[derive(PartialEq, Clone, Default, Debug)]
pub struct LoadData {
pub one: String,
pub five: String,
pub fifteen: String,
}
#[derive(PartialEq, Clone, Default, Debug)]
pub struct CpuStats {
pub cpu_usage: Option<CpuUsage>,
pub throttling_data: Option<ThrottlingData>,
}
#[derive(PartialEq, Clone, Default, Debug)]
pub struct MemoryData {
pub usage: u64,
pub max_usage: u64,
pub failcnt: u64,
pub limit: u64,
}
#[derive(PartialEq, Clone, Default, Debug)]
pub struct MemoryStats {
pub cache: u64,
pub usage: Option<MemoryData>,
pub swap_usage: Option<MemoryData>,
pub kernel_usage: Option<MemoryData>,
pub use_hierarchy: bool,
pub stats: ::std::collections::HashMap<String, u64>,
}
#[derive(PartialEq, Clone, Default, Debug)]
pub struct PidsStats {
pub current: u64,
pub limit: u64,
}
#[derive(PartialEq, Clone, Default, Debug)]
pub struct BlkioStatsEntry {
pub major: u64,
pub minor: u64,
pub op: String,
pub value: u64,
}
#[derive(PartialEq, Clone, Default, Debug)]
pub struct BlkioStats {
pub io_service_bytes_recursive: Vec<BlkioStatsEntry>,
pub io_serviced_recursive: Vec<BlkioStatsEntry>,
pub io_queued_recursive: Vec<BlkioStatsEntry>,
pub io_service_time_recursive: Vec<BlkioStatsEntry>,
pub io_wait_time_recursive: Vec<BlkioStatsEntry>,
pub io_merged_recursive: Vec<BlkioStatsEntry>,
pub io_time_recursive: Vec<BlkioStatsEntry>,
pub sectors_recursive: Vec<BlkioStatsEntry>,
}
#[derive(PartialEq, Clone, Default, Debug)]
pub struct HugetlbStats {
pub usage: u64,
pub max_usage: u64,
pub failcnt: u64,
}
#[derive(PartialEq, Clone, Default, Debug)]
pub struct CgroupStats {
pub cpu_stats: Option<CpuStats>,
pub memory_stats: Option<MemoryStats>,
pub pids_stats: Option<PidsStats>,
pub blkio_stats: Option<BlkioStats>,
pub hugetlb_stats: ::std::collections::HashMap<String, HugetlbStats>,
}
#[derive(PartialEq, Clone, Default, Debug)]
pub struct NetworkStats {
pub name: String,
pub rx_bytes: u64,
pub rx_packets: u64,
pub rx_errors: u64,
pub rx_dropped: u64,
pub tx_bytes: u64,
pub tx_packets: u64,
pub tx_errors: u64,
pub tx_dropped: u64,
}
#[derive(PartialEq, Clone, Default, Debug)]
pub struct StatsContainerResponse {
pub cgroup_stats: Option<CgroupStats>,
pub network_stats: Vec<NetworkStats>,
}
#[derive(PartialEq, Clone, Default, Debug)]
pub struct WaitProcessResponse {
pub status: i32,
}
#[derive(PartialEq, Clone, Default)]
pub struct ReadStreamRequest {
pub container_id: String,
pub exec_id: String,
pub len: u32,
}
#[derive(PartialEq, Clone, Default)]
pub struct ReadStreamResponse {
pub data: Vec<u8>,
}
#[derive(PartialEq, Clone, Default)]
pub struct CloseStdinRequest {
pub container_id: String,
pub exec_id: String,
}
#[derive(PartialEq, Clone, Default)]
pub struct TtyWinResizeRequest {
pub container_id: String,
pub exec_id: String,
pub row: u32,
pub column: u32,
}
#[derive(Debug, PartialEq, Clone, Default)]
pub struct UpdateInterfaceRequest {
pub interface: Option<Interface>,
}
#[derive(PartialEq, Clone, Default, Debug)]
pub struct UpdateRoutesRequest {
pub route: Option<Routes>,
}
#[derive(Deserialize, PartialEq, Clone, Default, Debug)]
pub struct ARPNeighbor {
pub to_ip_address: Option<IPAddress>,
pub device: String,
pub ll_addr: String,
pub state: i32,
pub flags: i32,
}
#[derive(PartialEq, Clone, Default, Debug)]
pub struct ARPNeighbors {
pub neighbors: Vec<ARPNeighbor>,
}
#[derive(PartialEq, Clone, Default, Debug)]
pub struct AddArpNeighborRequest {
pub neighbors: Option<ARPNeighbors>,
}
#[derive(PartialEq, Clone, Default)]
pub struct KernelModule {
pub name: String,
pub parameters: Vec<String>,
}
#[derive(PartialEq, Clone, Default)]
pub struct CreateSandboxRequest {
pub hostname: String,
pub dns: Vec<String>,
pub storages: Vec<Storage>,
pub sandbox_pidns: bool,
pub sandbox_id: String,
pub guest_hook_path: String,
pub kernel_modules: Vec<KernelModule>,
}
#[derive(PartialEq, Clone, Default)]
pub struct OnlineCPUMemRequest {
pub wait: bool,
pub nb_cpus: u32,
pub cpu_only: bool,
}
#[derive(PartialEq, Clone, Default)]
pub struct ReseedRandomDevRequest {
pub data: ::std::vec::Vec<u8>,
}
#[derive(PartialEq, Clone, Default)]
pub struct GetGuestDetailsRequest {
pub mem_block_size: bool,
pub mem_hotplug_probe: bool,
}
#[derive(PartialEq, Clone, Default)]
pub struct MemHotplugByProbeRequest {
pub mem_hotplug_probe_addr: ::std::vec::Vec<u64>,
}
#[derive(PartialEq, Clone, Default)]
pub struct SetGuestDateTimeRequest {
pub sec: i64,
pub usec: i64,
}
#[derive(PartialEq, Clone, Default)]
pub struct AgentDetails {
pub version: String,
pub init_daemon: bool,
pub device_handlers: Vec<String>,
pub storage_handlers: Vec<std::string::String>,
pub supports_seccomp: bool,
}
#[derive(PartialEq, Clone, Default)]
pub struct GuestDetailsResponse {
pub mem_block_size_bytes: u64,
pub agent_details: Option<AgentDetails>,
pub support_mem_hotplug_probe: bool,
}
#[derive(PartialEq, Clone, Default)]
pub struct CopyFileRequest {
pub path: String,
pub file_size: i64,
pub file_mode: u32,
pub dir_mode: u32,
pub uid: i32,
pub gid: i32,
pub offset: i64,
pub data: ::std::vec::Vec<u8>,
}
#[derive(PartialEq, Clone, Default, Debug)]
pub struct CheckRequest {
pub service: String,
}
impl CheckRequest {
pub fn new(service: &str) -> Self {
Self {
service: service.to_string(),
}
}
}
#[derive(PartialEq, Clone, Default, Debug)]
pub struct HealthCheckResponse {
pub status: u32,
}
#[derive(PartialEq, Clone, Default, Debug)]
pub struct VersionCheckResponse {
pub grpc_version: String,
pub agent_version: String,
}
#[derive(PartialEq, Clone, Default, Debug)]
pub struct OomEventResponse {
pub container_id: String,
}