diff --git a/src/agent/Cargo.lock b/src/agent/Cargo.lock index 1c027bb58e..21600eac9e 100644 --- a/src/agent/Cargo.lock +++ b/src/agent/Cargo.lock @@ -54,6 +54,17 @@ version = "0.5.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cff77d8686867eceff3105329d4698d96c2391c176d5d03adc90c7389162b5b8" +[[package]] +name = "async-trait" +version = "0.1.42" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d3a45e77e34375a7923b1e8febb049bb011f064714a8e17a1a616fef01da13d" +dependencies = [ + "proc-macro2 1.0.24", + "quote 1.0.7", + "syn 1.0.45", +] + [[package]] name = "autocfg" version = "1.0.1" @@ -113,6 +124,12 @@ dependencies = [ "iovec", ] +[[package]] +name = "bytes" +version = "0.5.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38" + [[package]] name = "caps" version = "0.3.4" @@ -165,7 +182,7 @@ dependencies = [ "num-integer", "num-traits", "time", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -241,7 +258,7 @@ checksum = "8e93d7f5705de3e49895a2b5e0b8855a1c27f080192ae9c32a6432d50741a57a" dependencies = [ "libc", "redox_users", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -268,7 +285,7 @@ checksum = "6eab5ee3df98a279d9b316b1af6ac95422127b1290317e6d18c1743c99418b01" dependencies = [ "errno-dragonfly", "libc", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -312,10 +329,115 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" [[package]] -name = "futures" -version = "0.1.30" +name = "fuchsia-zircon" +version = "0.3.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4c7e4c2612746b0df8fed4ce0c69156021b704c9aefa360311c04e6e9e002eed" +checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82" +dependencies = [ + "bitflags", + "fuchsia-zircon-sys", +] + +[[package]] +name = "fuchsia-zircon-sys" +version = "0.3.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7" + +[[package]] +name = "futures" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b3b0c040a1fe6529d30b3c5944b280c7f0dcb2930d2c3062bca967b602583d0" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b7109687aa4e177ef6fe84553af6280ef2778bdb7783ba44c9dc3399110fe64" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "847ce131b72ffb13b6109a221da9ad97a64cbe48feb1028356b836b47b8f1748" + +[[package]] +name = "futures-executor" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4caa2b2b68b880003057c1dd49f1ed937e38f22fcf6c212188a121f08cf40a65" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "611834ce18aaa1bd13c4b374f5d653e1027cf99b6b502584ff8c9a64413b30bb" + +[[package]] +name = "futures-macro" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77408a692f1f97bcc61dc001d752e00643408fbc922e4d634c655df50d595556" +dependencies = [ + "proc-macro-hack", + "proc-macro2 1.0.24", + "quote 1.0.7", + "syn 1.0.45", +] + +[[package]] +name = "futures-sink" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f878195a49cee50e006b02b93cf7e0a95a38ac7b776b4c4d9cc1207cd20fcb3d" + +[[package]] +name = "futures-task" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7c554eb5bf48b2426c4771ab68c6b14468b6e76cc90996f528c3338d761a4d0d" +dependencies = [ + "once_cell", +] + +[[package]] +name = "futures-util" +version = "0.3.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d304cff4a7b99cfb7986f7d43fbe93d175e72e704a8860787cc95e9ffd85cbd2" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project", + "pin-utils", + "proc-macro-hack", + "proc-macro-nested", + "slab", +] [[package]] name = "gcc" @@ -349,6 +471,15 @@ dependencies = [ "unicode-segmentation", ] +[[package]] +name = "hermit-abi" +version = "0.1.17" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aca5565f760fb5b220e499d72710ed156fdb74e631659e99377d9ebfbd13ae8" +dependencies = [ + "libc", +] + [[package]] name = "hex" version = "0.4.2" @@ -384,6 +515,7 @@ name = "kata-agent" version = "0.1.0" dependencies = [ "anyhow", + "async-trait", "cgroups-rs", "lazy_static", "libc", @@ -407,9 +539,20 @@ dependencies = [ "slog-scope", "slog-stdlog", "tempfile", + "tokio", "ttrpc", ] +[[package]] +name = "kernel32-sys" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d" +dependencies = [ + "winapi 0.2.8", + "winapi-build", +] + [[package]] name = "lazy_static" version = "1.4.0" @@ -491,12 +634,65 @@ dependencies = [ "autocfg", ] +[[package]] +name = "mio" +version = "0.6.23" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4afd66f5b91bf2a3bc13fad0e21caedac168ca4c707504e75585648ae80e4cc4" +dependencies = [ + "cfg-if 0.1.10", + "fuchsia-zircon", + "fuchsia-zircon-sys", + "iovec", + "kernel32-sys", + "libc", + "log", + "miow", + "net2", + "slab", + "winapi 0.2.8", +] + +[[package]] +name = "mio-uds" +version = "0.6.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "afcb699eb26d4332647cc848492bbc15eafb26f08d0304550d5aa1f612e066f0" +dependencies = [ + "iovec", + "libc", + "mio", +] + +[[package]] +name = "miow" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ebd808424166322d4a38da87083bfddd3ac4c131334ed55856112eb06d46944d" +dependencies = [ + "kernel32-sys", + "net2", + "winapi 0.2.8", + "ws2_32-sys", +] + [[package]] name = "multimap" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2eb04b9f127583ed176e163fb9ec6f3e793b87e21deedd5734a69386a18a0151" +[[package]] +name = "net2" +version = "0.2.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "391630d12b68002ae1e25e8f974306474966550ad82dac6886fb8910c19568ae" +dependencies = [ + "cfg-if 0.1.10", + "libc", + "winapi 0.3.9", +] + [[package]] name = "netlink" version = "0.1.0" @@ -584,6 +780,16 @@ dependencies = [ "autocfg", ] +[[package]] +name = "num_cpus" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3" +dependencies = [ + "hermit-abi", + "libc", +] + [[package]] name = "object" version = "0.21.1" @@ -600,6 +806,12 @@ dependencies = [ "serde_json", ] +[[package]] +name = "once_cell" +version = "1.5.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "13bd41f508810a131401606d54ac32a467c97172d74ba7662562ebba5ad07fa0" + [[package]] name = "parking_lot" version = "0.10.2" @@ -621,7 +833,7 @@ dependencies = [ "libc", "redox_syscall", "smallvec", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -652,6 +864,38 @@ dependencies = [ "fixedbitset", ] +[[package]] +name = "pin-project" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9ccc2237c2c489783abd8c4c80e5450fc0e98644555b1364da68cc29aa151ca7" +dependencies = [ + "pin-project-internal", +] + +[[package]] +name = "pin-project-internal" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f8e8d2bf0b23038a4424865103a4df472855692821aab4e4f5c3312d461d9e5f" +dependencies = [ + "proc-macro2 1.0.24", + "quote 1.0.7", + "syn 1.0.45", +] + +[[package]] +name = "pin-project-lite" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c917123afa01924fc84bb20c4c03f004d9c38e5127e3c039bbf7f4b9c76a2f6b" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + [[package]] name = "ppv-lite86" version = "0.2.9" @@ -668,6 +912,18 @@ dependencies = [ "nix 0.19.0", ] +[[package]] +name = "proc-macro-hack" +version = "0.5.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5" + +[[package]] +name = "proc-macro-nested" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a" + [[package]] name = "proc-macro2" version = "0.4.30" @@ -736,7 +992,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "96d14b1c185652833d24aaad41c5832b0be5616a590227c1fbff57c616754b23" dependencies = [ "byteorder", - "bytes", + "bytes 0.4.12", "prost-derive", ] @@ -746,7 +1002,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eb788126ea840817128183f8f603dce02cb7aea25c2a0b764359d8e20010702e" dependencies = [ - "bytes", + "bytes 0.4.12", "heck", "itertools", "log", @@ -777,7 +1033,7 @@ version = "0.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1de482a366941c8d56d19b650fac09ca08508f2a696119ee7513ad590c8bac6f" dependencies = [ - "bytes", + "bytes 0.4.12", "prost", ] @@ -810,7 +1066,7 @@ dependencies = [ name = "protocols" version = "0.1.0" dependencies = [ - "futures", + "async-trait", "protobuf", "ttrpc", "ttrpc-codegen", @@ -916,7 +1172,7 @@ version = "0.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" dependencies = [ - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1087,6 +1343,12 @@ dependencies = [ "libc", ] +[[package]] +name = "slab" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8" + [[package]] name = "slash-formatter" version = "2.2.6" @@ -1196,7 +1458,7 @@ dependencies = [ "rand", "redox_syscall", "remove_dir_all", - "winapi", + "winapi 0.3.9", ] [[package]] @@ -1236,21 +1498,73 @@ checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255" dependencies = [ "libc", "wasi 0.10.0+wasi-snapshot-preview1", - "winapi", + "winapi 0.3.9", +] + +[[package]] +name = "tokio" +version = "0.2.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "099837d3464c16a808060bb3f02263b412f6fafcb5d01c533d309985fbeebe48" +dependencies = [ + "bytes 0.5.6", + "fnv", + "futures-core", + "lazy_static", + "libc", + "memchr", + "mio", + "mio-uds", + "num_cpus", + "pin-project-lite", + "slab", + "tokio-macros", +] + +[[package]] +name = "tokio-macros" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e44da00bfc73a25f814cd8d7e57a68a5c31b74b3152a0a1d1f590c97ed06265a" +dependencies = [ + "proc-macro2 1.0.24", + "quote 1.0.7", + "syn 1.0.45", +] + +[[package]] +name = "tokio-vsock" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "547a35667d4d842422da7f5528612321770f2f640e7fd5df0431de7b717fb2b4" +dependencies = [ + "bytes 0.4.12", + "futures", + "iovec", + "libc", + "mio", + "nix 0.17.0", + "tokio", + "vsock", ] [[package]] name = "ttrpc" -version = "0.3.0" +version = "0.4.13" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bfa9da24c351f0feef5e66c0b28c18373a7ef3e1bfdfd5852170de494f9bf870" +checksum = "d6e99ffa09e7fbe514b58b01bd17d71e3ed4dd27c588afa43d41ec0b7fc90b0a" dependencies = [ + "async-trait", "byteorder", + "futures", "libc", "log", "nix 0.16.1", "protobuf", "protobuf-codegen-pure", + "thiserror", + "tokio", + "tokio-vsock", ] [[package]] @@ -1310,6 +1624,16 @@ version = "1.0.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" +[[package]] +name = "vsock" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dba063357047c0f2216c7c653879ea4e5e198d0c3cde7efa37ebfd9039b48491" +dependencies = [ + "libc", + "nix 0.17.0", +] + [[package]] name = "wasi" version = "0.9.0+wasi-snapshot-preview1" @@ -1332,6 +1656,12 @@ dependencies = [ "libc", ] +[[package]] +name = "winapi" +version = "0.2.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a" + [[package]] name = "winapi" version = "0.3.9" @@ -1342,6 +1672,12 @@ dependencies = [ "winapi-x86_64-pc-windows-gnu", ] +[[package]] +name = "winapi-build" +version = "0.1.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc" + [[package]] name = "winapi-i686-pc-windows-gnu" version = "0.4.0" @@ -1353,3 +1689,13 @@ name = "winapi-x86_64-pc-windows-gnu" version = "0.4.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" + +[[package]] +name = "ws2_32-sys" +version = "0.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e" +dependencies = [ + "winapi 0.2.8", + "winapi-build", +] diff --git a/src/agent/Cargo.toml b/src/agent/Cargo.toml index 07fc2f6b15..1fc3e287e7 100644 --- a/src/agent/Cargo.toml +++ b/src/agent/Cargo.toml @@ -11,7 +11,7 @@ rustjail = { path = "rustjail" } protocols = { path = "protocols" } netlink = { path = "netlink", features = ["with-log", "with-agent-handler"] } lazy_static = "1.3.0" -ttrpc = "0.3.0" +ttrpc = { version="0.4.13", features=["async"] } protobuf = "=2.14.0" libc = "0.2.58" nix = "0.17.0" @@ -21,6 +21,8 @@ signal-hook = "0.1.9" scan_fmt = "0.2.3" scopeguard = "1.0.0" regex = "1" +tokio = { version="0.2", features = ["macros", "rt-threaded"] } +async-trait = "0.1.42" # slog: # - Dynamic keys required to allow HashMap keys to be slog::Serialized. diff --git a/src/agent/protocols/Cargo.toml b/src/agent/protocols/Cargo.toml index 59ab72427a..f44c4bc501 100644 --- a/src/agent/protocols/Cargo.toml +++ b/src/agent/protocols/Cargo.toml @@ -5,9 +5,9 @@ authors = ["The Kata Containers community "] edition = "2018" [dependencies] -ttrpc = "0.3.0" +ttrpc = { version="0.4.13", features=["async"] } +async-trait = "0.1.42" protobuf = "=2.14.0" -futures = "0.1.27" [build-dependencies] ttrpc-codegen = "0.1.2" diff --git a/src/agent/protocols/build.rs b/src/agent/protocols/build.rs index fcc4434478..21779f1dcb 100644 --- a/src/agent/protocols/build.rs +++ b/src/agent/protocols/build.rs @@ -3,8 +3,8 @@ // SPDX-License-Identifier: Apache-2.0 // -use std::fs::File; -use std::io::{Read, Write}; +use std::fs; +use ttrpc_codegen::{Codegen, Customize}; fn main() { let protos = vec![ @@ -18,13 +18,17 @@ fn main() { // Tell Cargo that if the .proto files changed, to rerun this build script. protos .iter() - .for_each(|p| println!("cargo:rerun-if-changed={}", &p)); + .for_each(|p| println!("cargo:rerun-if-changed={}", p)); - ttrpc_codegen::Codegen::new() + Codegen::new() .out_dir("src") .inputs(&protos) .include("protos") .rust_protobuf() + .customize(Customize { + async_server: true, + ..Default::default() + }) .run() .expect("Gen codes failed."); @@ -40,16 +44,6 @@ fn main() { } fn replace_text_in_file(file_name: &str, from: &str, to: &str) -> Result<(), std::io::Error> { - let mut src = File::open(file_name)?; - let mut contents = String::new(); - src.read_to_string(&mut contents).unwrap(); - drop(src); - - let new_contents = contents.replace(from, to); - - let mut dst = File::create(&file_name)?; - dst.write_all(new_contents.as_bytes())?; - - Ok(()) + let new_contents = fs::read_to_string(file_name)?.replace(from, to); + fs::write(&file_name, new_contents.as_bytes()) } - diff --git a/src/agent/src/main.rs b/src/agent/src/main.rs index 6bd2470287..aef948769c 100644 --- a/src/agent/src/main.rs +++ b/src/agent/src/main.rs @@ -100,7 +100,8 @@ fn announce(logger: &Logger, config: &agentConfig) { ); } -fn main() -> Result<()> { +#[tokio::main] +async fn main() -> Result<()> { let args: Vec = env::args().collect(); if args.len() == 2 && args[1] == "--version" { @@ -205,14 +206,14 @@ fn main() -> Result<()> { _log_guard = Ok(slog_stdlog::init().map_err(|e| e)?); } - start_sandbox(&logger, &config, init_mode)?; + start_sandbox(&logger, &config, init_mode).await?; let _ = log_handle.join(); Ok(()) } -fn start_sandbox(logger: &Logger, config: &agentConfig, init_mode: bool) -> Result<()> { +async fn start_sandbox(logger: &Logger, config: &agentConfig, init_mode: bool) -> Result<()> { let shells = SHELLS.clone(); let debug_console_vport = config.debug_console_vport as u32; @@ -254,13 +255,13 @@ fn start_sandbox(logger: &Logger, config: &agentConfig, init_mode: bool) -> Resu sandbox.lock().unwrap().sender = Some(tx); // vsock:///dev/vsock, port - let mut server = rpc::start(sandbox, config.server_addr.as_str()); + let mut server = rpc::start(sandbox, config.server_addr.as_str()).await?; - let _ = server.start().unwrap(); + server.start().await?; let _ = rx.recv()?; - server.shutdown(); + server.shutdown().await?; if let Some(handle) = shell_handle { handle.join().map_err(|e| anyhow!("{:?}", e))?; diff --git a/src/agent/src/rpc.rs b/src/agent/src/rpc.rs index 03d45bdfa2..50d7ac4a8a 100644 --- a/src/agent/src/rpc.rs +++ b/src/agent/src/rpc.rs @@ -3,10 +3,15 @@ // SPDX-License-Identifier: Apache-2.0 // +use async_trait::async_trait; use std::path::Path; use std::sync::mpsc::channel; use std::sync::{Arc, Mutex}; -use ttrpc::{self, error::get_rpc_status as ttrpc_error}; +use ttrpc::{ + self, + error::get_rpc_status as ttrpc_error, + r#async::{Server as TtrpcServer, TtrpcContext}, +}; use anyhow::{anyhow, Context, Result}; use oci::{LinuxNamespace, Root, Spec}; @@ -524,10 +529,11 @@ impl agentService { } } +#[async_trait] impl protocols::agent_ttrpc::AgentService for agentService { - fn create_container( + async fn create_container( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::CreateContainerRequest, ) -> ttrpc::Result { match self.do_create_container(req) { @@ -536,9 +542,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { } } - fn start_container( + async fn start_container( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::StartContainerRequest, ) -> ttrpc::Result { match self.do_start_container(req) { @@ -547,9 +553,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { } } - fn remove_container( + async fn remove_container( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::RemoveContainerRequest, ) -> ttrpc::Result { match self.do_remove_container(req) { @@ -558,9 +564,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { } } - fn exec_process( + async fn exec_process( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::ExecProcessRequest, ) -> ttrpc::Result { match self.do_exec_process(req) { @@ -569,9 +575,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { } } - fn signal_process( + async fn signal_process( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::SignalProcessRequest, ) -> ttrpc::Result { match self.do_signal_process(req) { @@ -580,18 +586,18 @@ impl protocols::agent_ttrpc::AgentService for agentService { } } - fn wait_process( + async fn wait_process( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::WaitProcessRequest, ) -> ttrpc::Result { self.do_wait_process(req) .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())) } - fn list_processes( + async fn list_processes( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::ListProcessesRequest, ) -> ttrpc::Result { let cid = req.container_id.clone(); @@ -673,9 +679,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { Ok(resp) } - fn update_container( + async fn update_container( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::UpdateContainerRequest, ) -> ttrpc::Result { let cid = req.container_id.clone(); @@ -707,9 +713,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { Ok(resp) } - fn stats_container( + async fn stats_container( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::StatsContainerRequest, ) -> ttrpc::Result { let cid = req.container_id; @@ -727,9 +733,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())) } - fn pause_container( + async fn pause_container( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::PauseContainerRequest, ) -> ttrpc::Result { let cid = req.get_container_id(); @@ -749,9 +755,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { Ok(Empty::new()) } - fn resume_container( + async fn resume_container( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::ResumeContainerRequest, ) -> ttrpc::Result { let cid = req.get_container_id(); @@ -771,36 +777,36 @@ impl protocols::agent_ttrpc::AgentService for agentService { Ok(Empty::new()) } - fn write_stdin( + async fn write_stdin( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::WriteStreamRequest, ) -> ttrpc::Result { self.do_write_stream(req) .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())) } - fn read_stdout( + async fn read_stdout( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::ReadStreamRequest, ) -> ttrpc::Result { self.do_read_stream(req, true) .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())) } - fn read_stderr( + async fn read_stderr( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::ReadStreamRequest, ) -> ttrpc::Result { self.do_read_stream(req, false) .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())) } - fn close_stdin( + async fn close_stdin( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::CloseStdinRequest, ) -> ttrpc::Result { let cid = req.container_id.clone(); @@ -830,9 +836,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { Ok(Empty::new()) } - fn tty_win_resize( + async fn tty_win_resize( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::TtyWinResizeRequest, ) -> ttrpc::Result { let cid = req.container_id.clone(); @@ -868,9 +874,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { Ok(Empty::new()) } - fn update_interface( + async fn update_interface( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::UpdateInterfaceRequest, ) -> ttrpc::Result { if req.interface.is_none() { @@ -899,9 +905,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { Ok(iface) } - fn update_routes( + async fn update_routes( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::UpdateRoutesRequest, ) -> ttrpc::Result { let mut routes = protocols::agent::Routes::new(); @@ -938,9 +944,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { Ok(routes) } - fn list_interfaces( + async fn list_interfaces( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, _req: protocols::agent::ListInterfacesRequest, ) -> ttrpc::Result { let mut interface = protocols::agent::Interfaces::new(); @@ -961,9 +967,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { Ok(interface) } - fn list_routes( + async fn list_routes( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, _req: protocols::agent::ListRoutesRequest, ) -> ttrpc::Result { let mut routes = protocols::agent::Routes::new(); @@ -985,26 +991,26 @@ impl protocols::agent_ttrpc::AgentService for agentService { Ok(routes) } - fn start_tracing( + async fn start_tracing( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::StartTracingRequest, ) -> ttrpc::Result { info!(sl!(), "start_tracing {:?}", req); Ok(Empty::new()) } - fn stop_tracing( + async fn stop_tracing( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, _req: protocols::agent::StopTracingRequest, ) -> ttrpc::Result { Ok(Empty::new()) } - fn create_sandbox( + async fn create_sandbox( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::CreateSandboxRequest, ) -> ttrpc::Result { { @@ -1064,9 +1070,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { Ok(Empty::new()) } - fn destroy_sandbox( + async fn destroy_sandbox( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, _req: protocols::agent::DestroySandboxRequest, ) -> ttrpc::Result { let s = Arc::clone(&self.sandbox); @@ -1081,9 +1087,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { Ok(Empty::new()) } - fn add_arp_neighbors( + async fn add_arp_neighbors( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::AddARPNeighborsRequest, ) -> ttrpc::Result { if req.neighbors.is_none() { @@ -1110,9 +1116,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { Ok(Empty::new()) } - fn online_cpu_mem( + async fn online_cpu_mem( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::OnlineCPUMemRequest, ) -> ttrpc::Result { let s = Arc::clone(&self.sandbox); @@ -1125,9 +1131,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { Ok(Empty::new()) } - fn reseed_random_dev( + async fn reseed_random_dev( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::ReseedRandomDevRequest, ) -> ttrpc::Result { random::reseed_rng(req.data.as_slice()) @@ -1136,9 +1142,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { Ok(Empty::new()) } - fn get_guest_details( + async fn get_guest_details( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::GuestDetailsRequest, ) -> ttrpc::Result { info!(sl!(), "get guest details!"); @@ -1162,9 +1168,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { Ok(resp) } - fn mem_hotplug_by_probe( + async fn mem_hotplug_by_probe( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::MemHotplugByProbeRequest, ) -> ttrpc::Result { do_mem_hotplug_by_probe(&req.memHotplugProbeAddr) @@ -1173,9 +1179,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { Ok(Empty::new()) } - fn set_guest_date_time( + async fn set_guest_date_time( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::SetGuestDateTimeRequest, ) -> ttrpc::Result { do_set_guest_date_time(req.Sec, req.Usec) @@ -1184,9 +1190,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { Ok(Empty::new()) } - fn copy_file( + async fn copy_file( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::CopyFileRequest, ) -> ttrpc::Result { do_copy_file(&req).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; @@ -1194,9 +1200,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { Ok(Empty::new()) } - fn get_metrics( + async fn get_metrics( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::agent::GetMetricsRequest, ) -> ttrpc::Result { match get_metrics(&req) { @@ -1209,9 +1215,9 @@ impl protocols::agent_ttrpc::AgentService for agentService { } } - fn get_oom_event( + async fn get_oom_event( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, _req: protocols::agent::GetOOMEventRequest, ) -> ttrpc::Result { let sandbox = self.sandbox.clone(); @@ -1235,10 +1241,12 @@ impl protocols::agent_ttrpc::AgentService for agentService { #[derive(Clone)] struct healthService; + +#[async_trait] impl protocols::health_ttrpc::Health for healthService { - fn check( + async fn check( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, _req: protocols::health::CheckRequest, ) -> ttrpc::Result { let mut resp = HealthCheckResponse::new(); @@ -1247,9 +1255,9 @@ impl protocols::health_ttrpc::Health for healthService { Ok(resp) } - fn version( + async fn version( &self, - _ctx: &ttrpc::TtrpcContext, + _ctx: &TtrpcContext, req: protocols::health::CheckRequest, ) -> ttrpc::Result { info!(sl!(), "version {:?}", req); @@ -1376,7 +1384,7 @@ fn find_process<'a>( ctr.get_process(eid).map_err(|_| anyhow!("Invalid exec id")) } -pub fn start(s: Arc>, server_address: &str) -> ttrpc::Server { +pub async fn start(s: Arc>, server_address: &str) -> Result { let agent_service = Box::new(agentService { sandbox: s }) as Box; @@ -1390,15 +1398,14 @@ pub fn start(s: Arc>, server_address: &str) -> ttrpc::Server { let hservice = protocols::health_ttrpc::create_health(health_worker); - let server = ttrpc::Server::new() - .bind(server_address) - .unwrap() + let server = TtrpcServer::new() + .bind(server_address)? .register_service(aservice) .register_service(hservice); info!(sl!(), "ttRPC server started"; "address" => server_address); - server + Ok(server) } // This function updates the container namespaces configuration based on the @@ -1679,23 +1686,13 @@ mod tests { use super::*; use crate::protocols::agent_ttrpc::AgentService; use oci::{Hook, Hooks}; - use std::sync::mpsc::{Receiver, Sender}; - use ttrpc::{MessageHeader, TtrpcContext}; + use ttrpc::{r#async::TtrpcContext, MessageHeader}; - type Message = (MessageHeader, Vec); - - fn mk_ttrpc_context() -> (TtrpcContext, Receiver) { - let mh = MessageHeader::default(); - - let (tx, rx): (Sender, Receiver) = channel(); - - let ctx = TtrpcContext { + fn mk_ttrpc_context() -> TtrpcContext { + TtrpcContext { fd: -1, - mh, - res_tx: tx, - }; - - (ctx, rx) + mh: MessageHeader::default(), + } } #[test] @@ -1737,8 +1734,8 @@ mod tests { assert_eq!(s.hooks, oci.hooks); } - #[test] - fn test_update_interface() { + #[tokio::test] + async fn test_update_interface() { let logger = slog::Logger::root(slog::Discard, o!()); let sandbox = Sandbox::new(&logger).unwrap(); @@ -1747,15 +1744,15 @@ mod tests { }); let req = protocols::agent::UpdateInterfaceRequest::default(); - let (ctx, _) = mk_ttrpc_context(); + let ctx = mk_ttrpc_context(); - let result = agent_service.update_interface(&ctx, req); + let result = agent_service.update_interface(&ctx, req).await; assert!(result.is_err(), "expected update interface to fail"); } - #[test] - fn test_update_routes() { + #[tokio::test] + async fn test_update_routes() { let logger = slog::Logger::root(slog::Discard, o!()); let sandbox = Sandbox::new(&logger).unwrap(); @@ -1764,15 +1761,15 @@ mod tests { }); let req = protocols::agent::UpdateRoutesRequest::default(); - let (ctx, _) = mk_ttrpc_context(); + let ctx = mk_ttrpc_context(); - let result = agent_service.update_routes(&ctx, req); + let result = agent_service.update_routes(&ctx, req).await; assert!(result.is_err(), "expected update routes to fail"); } - #[test] - fn test_add_arp_neighbors() { + #[tokio::test] + async fn test_add_arp_neighbors() { let logger = slog::Logger::root(slog::Discard, o!()); let sandbox = Sandbox::new(&logger).unwrap(); @@ -1781,9 +1778,9 @@ mod tests { }); let req = protocols::agent::AddARPNeighborsRequest::default(); - let (ctx, _) = mk_ttrpc_context(); + let ctx = mk_ttrpc_context(); - let result = agent_service.add_arp_neighbors(&ctx, req); + let result = agent_service.add_arp_neighbors(&ctx, req).await; assert!(result.is_err(), "expected add arp neighbors to fail"); }