agent: Initial switch to async runtime

This commit includes minimal changes in order to switch to Tokio:
- Update protocol crate to generate async server code
- Adds async entry point to the Agent
- Updates agent services signatures in rpc.rs

Fixes: #1209

Signed-off-by: Maksym Pavlenko <pavlenko.maksym@gmail.com>
This commit is contained in:
Maksym Pavlenko 2020-12-18 23:52:34 -08:00 committed by Tim Zhang
parent a6d52d3da1
commit 5561755e3c
6 changed files with 484 additions and 144 deletions

378
src/agent/Cargo.lock generated
View File

@ -54,6 +54,17 @@ version = "0.5.1"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "cff77d8686867eceff3105329d4698d96c2391c176d5d03adc90c7389162b5b8" checksum = "cff77d8686867eceff3105329d4698d96c2391c176d5d03adc90c7389162b5b8"
[[package]]
name = "async-trait"
version = "0.1.42"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8d3a45e77e34375a7923b1e8febb049bb011f064714a8e17a1a616fef01da13d"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.45",
]
[[package]] [[package]]
name = "autocfg" name = "autocfg"
version = "1.0.1" version = "1.0.1"
@ -113,6 +124,12 @@ dependencies = [
"iovec", "iovec",
] ]
[[package]]
name = "bytes"
version = "0.5.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "0e4cec68f03f32e44924783795810fa50a7035d8c8ebe78580ad7e6c703fba38"
[[package]] [[package]]
name = "caps" name = "caps"
version = "0.3.4" version = "0.3.4"
@ -165,7 +182,7 @@ dependencies = [
"num-integer", "num-integer",
"num-traits", "num-traits",
"time", "time",
"winapi", "winapi 0.3.9",
] ]
[[package]] [[package]]
@ -241,7 +258,7 @@ checksum = "8e93d7f5705de3e49895a2b5e0b8855a1c27f080192ae9c32a6432d50741a57a"
dependencies = [ dependencies = [
"libc", "libc",
"redox_users", "redox_users",
"winapi", "winapi 0.3.9",
] ]
[[package]] [[package]]
@ -268,7 +285,7 @@ checksum = "6eab5ee3df98a279d9b316b1af6ac95422127b1290317e6d18c1743c99418b01"
dependencies = [ dependencies = [
"errno-dragonfly", "errno-dragonfly",
"libc", "libc",
"winapi", "winapi 0.3.9",
] ]
[[package]] [[package]]
@ -312,10 +329,115 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1" checksum = "3f9eec918d3f24069decb9af1554cad7c880e2da24a9afd88aca000531ab82c1"
[[package]] [[package]]
name = "futures" name = "fuchsia-zircon"
version = "0.1.30" version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4c7e4c2612746b0df8fed4ce0c69156021b704c9aefa360311c04e6e9e002eed" checksum = "2e9763c69ebaae630ba35f74888db465e49e259ba1bc0eda7d06f4a067615d82"
dependencies = [
"bitflags",
"fuchsia-zircon-sys",
]
[[package]]
name = "fuchsia-zircon-sys"
version = "0.3.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3dcaa9ae7725d12cdb85b3ad99a434db70b468c09ded17e012d86b5c1010f7a7"
[[package]]
name = "futures"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9b3b0c040a1fe6529d30b3c5944b280c7f0dcb2930d2c3062bca967b602583d0"
dependencies = [
"futures-channel",
"futures-core",
"futures-executor",
"futures-io",
"futures-sink",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-channel"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4b7109687aa4e177ef6fe84553af6280ef2778bdb7783ba44c9dc3399110fe64"
dependencies = [
"futures-core",
"futures-sink",
]
[[package]]
name = "futures-core"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "847ce131b72ffb13b6109a221da9ad97a64cbe48feb1028356b836b47b8f1748"
[[package]]
name = "futures-executor"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4caa2b2b68b880003057c1dd49f1ed937e38f22fcf6c212188a121f08cf40a65"
dependencies = [
"futures-core",
"futures-task",
"futures-util",
]
[[package]]
name = "futures-io"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "611834ce18aaa1bd13c4b374f5d653e1027cf99b6b502584ff8c9a64413b30bb"
[[package]]
name = "futures-macro"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "77408a692f1f97bcc61dc001d752e00643408fbc922e4d634c655df50d595556"
dependencies = [
"proc-macro-hack",
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.45",
]
[[package]]
name = "futures-sink"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f878195a49cee50e006b02b93cf7e0a95a38ac7b776b4c4d9cc1207cd20fcb3d"
[[package]]
name = "futures-task"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7c554eb5bf48b2426c4771ab68c6b14468b6e76cc90996f528c3338d761a4d0d"
dependencies = [
"once_cell",
]
[[package]]
name = "futures-util"
version = "0.3.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d304cff4a7b99cfb7986f7d43fbe93d175e72e704a8860787cc95e9ffd85cbd2"
dependencies = [
"futures-channel",
"futures-core",
"futures-io",
"futures-macro",
"futures-sink",
"futures-task",
"memchr",
"pin-project",
"pin-utils",
"proc-macro-hack",
"proc-macro-nested",
"slab",
]
[[package]] [[package]]
name = "gcc" name = "gcc"
@ -349,6 +471,15 @@ dependencies = [
"unicode-segmentation", "unicode-segmentation",
] ]
[[package]]
name = "hermit-abi"
version = "0.1.17"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "5aca5565f760fb5b220e499d72710ed156fdb74e631659e99377d9ebfbd13ae8"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "hex" name = "hex"
version = "0.4.2" version = "0.4.2"
@ -384,6 +515,7 @@ name = "kata-agent"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"anyhow", "anyhow",
"async-trait",
"cgroups-rs", "cgroups-rs",
"lazy_static", "lazy_static",
"libc", "libc",
@ -407,9 +539,20 @@ dependencies = [
"slog-scope", "slog-scope",
"slog-stdlog", "slog-stdlog",
"tempfile", "tempfile",
"tokio",
"ttrpc", "ttrpc",
] ]
[[package]]
name = "kernel32-sys"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "7507624b29483431c0ba2d82aece8ca6cdba9382bff4ddd0f7490560c056098d"
dependencies = [
"winapi 0.2.8",
"winapi-build",
]
[[package]] [[package]]
name = "lazy_static" name = "lazy_static"
version = "1.4.0" version = "1.4.0"
@ -491,12 +634,65 @@ dependencies = [
"autocfg", "autocfg",
] ]
[[package]]
name = "mio"
version = "0.6.23"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "4afd66f5b91bf2a3bc13fad0e21caedac168ca4c707504e75585648ae80e4cc4"
dependencies = [
"cfg-if 0.1.10",
"fuchsia-zircon",
"fuchsia-zircon-sys",
"iovec",
"kernel32-sys",
"libc",
"log",
"miow",
"net2",
"slab",
"winapi 0.2.8",
]
[[package]]
name = "mio-uds"
version = "0.6.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "afcb699eb26d4332647cc848492bbc15eafb26f08d0304550d5aa1f612e066f0"
dependencies = [
"iovec",
"libc",
"mio",
]
[[package]]
name = "miow"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ebd808424166322d4a38da87083bfddd3ac4c131334ed55856112eb06d46944d"
dependencies = [
"kernel32-sys",
"net2",
"winapi 0.2.8",
"ws2_32-sys",
]
[[package]] [[package]]
name = "multimap" name = "multimap"
version = "0.4.0" version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2eb04b9f127583ed176e163fb9ec6f3e793b87e21deedd5734a69386a18a0151" checksum = "2eb04b9f127583ed176e163fb9ec6f3e793b87e21deedd5734a69386a18a0151"
[[package]]
name = "net2"
version = "0.2.37"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "391630d12b68002ae1e25e8f974306474966550ad82dac6886fb8910c19568ae"
dependencies = [
"cfg-if 0.1.10",
"libc",
"winapi 0.3.9",
]
[[package]] [[package]]
name = "netlink" name = "netlink"
version = "0.1.0" version = "0.1.0"
@ -584,6 +780,16 @@ dependencies = [
"autocfg", "autocfg",
] ]
[[package]]
name = "num_cpus"
version = "1.13.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "05499f3756671c15885fee9034446956fff3f243d6077b91e5767df161f766b3"
dependencies = [
"hermit-abi",
"libc",
]
[[package]] [[package]]
name = "object" name = "object"
version = "0.21.1" version = "0.21.1"
@ -600,6 +806,12 @@ dependencies = [
"serde_json", "serde_json",
] ]
[[package]]
name = "once_cell"
version = "1.5.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "13bd41f508810a131401606d54ac32a467c97172d74ba7662562ebba5ad07fa0"
[[package]] [[package]]
name = "parking_lot" name = "parking_lot"
version = "0.10.2" version = "0.10.2"
@ -621,7 +833,7 @@ dependencies = [
"libc", "libc",
"redox_syscall", "redox_syscall",
"smallvec", "smallvec",
"winapi", "winapi 0.3.9",
] ]
[[package]] [[package]]
@ -652,6 +864,38 @@ dependencies = [
"fixedbitset", "fixedbitset",
] ]
[[package]]
name = "pin-project"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "9ccc2237c2c489783abd8c4c80e5450fc0e98644555b1364da68cc29aa151ca7"
dependencies = [
"pin-project-internal",
]
[[package]]
name = "pin-project-internal"
version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "f8e8d2bf0b23038a4424865103a4df472855692821aab4e4f5c3312d461d9e5f"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.45",
]
[[package]]
name = "pin-project-lite"
version = "0.1.11"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c917123afa01924fc84bb20c4c03f004d9c38e5127e3c039bbf7f4b9c76a2f6b"
[[package]]
name = "pin-utils"
version = "0.1.0"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
[[package]] [[package]]
name = "ppv-lite86" name = "ppv-lite86"
version = "0.2.9" version = "0.2.9"
@ -668,6 +912,18 @@ dependencies = [
"nix 0.19.0", "nix 0.19.0",
] ]
[[package]]
name = "proc-macro-hack"
version = "0.5.19"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dbf0c48bc1d91375ae5c3cd81e3722dff1abcf81a30960240640d223f59fe0e5"
[[package]]
name = "proc-macro-nested"
version = "0.1.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eba180dafb9038b050a4c280019bbedf9f2467b61e5d892dcad585bb57aadc5a"
[[package]] [[package]]
name = "proc-macro2" name = "proc-macro2"
version = "0.4.30" version = "0.4.30"
@ -736,7 +992,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "96d14b1c185652833d24aaad41c5832b0be5616a590227c1fbff57c616754b23" checksum = "96d14b1c185652833d24aaad41c5832b0be5616a590227c1fbff57c616754b23"
dependencies = [ dependencies = [
"byteorder", "byteorder",
"bytes", "bytes 0.4.12",
"prost-derive", "prost-derive",
] ]
@ -746,7 +1002,7 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "eb788126ea840817128183f8f603dce02cb7aea25c2a0b764359d8e20010702e" checksum = "eb788126ea840817128183f8f603dce02cb7aea25c2a0b764359d8e20010702e"
dependencies = [ dependencies = [
"bytes", "bytes 0.4.12",
"heck", "heck",
"itertools", "itertools",
"log", "log",
@ -777,7 +1033,7 @@ version = "0.5.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "1de482a366941c8d56d19b650fac09ca08508f2a696119ee7513ad590c8bac6f" checksum = "1de482a366941c8d56d19b650fac09ca08508f2a696119ee7513ad590c8bac6f"
dependencies = [ dependencies = [
"bytes", "bytes 0.4.12",
"prost", "prost",
] ]
@ -810,7 +1066,7 @@ dependencies = [
name = "protocols" name = "protocols"
version = "0.1.0" version = "0.1.0"
dependencies = [ dependencies = [
"futures", "async-trait",
"protobuf", "protobuf",
"ttrpc", "ttrpc",
"ttrpc-codegen", "ttrpc-codegen",
@ -916,7 +1172,7 @@ version = "0.5.3"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7" checksum = "3acd125665422973a33ac9d3dd2df85edad0f4ae9b00dafb1a05e43a9f5ef8e7"
dependencies = [ dependencies = [
"winapi", "winapi 0.3.9",
] ]
[[package]] [[package]]
@ -1087,6 +1343,12 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "slab"
version = "0.4.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "c111b5bd5695e56cffe5129854aa230b39c93a305372fdbb2668ca2394eea9f8"
[[package]] [[package]]
name = "slash-formatter" name = "slash-formatter"
version = "2.2.6" version = "2.2.6"
@ -1196,7 +1458,7 @@ dependencies = [
"rand", "rand",
"redox_syscall", "redox_syscall",
"remove_dir_all", "remove_dir_all",
"winapi", "winapi 0.3.9",
] ]
[[package]] [[package]]
@ -1236,21 +1498,73 @@ checksum = "6db9e6914ab8b1ae1c260a4ae7a49b6c5611b40328a735b21862567685e73255"
dependencies = [ dependencies = [
"libc", "libc",
"wasi 0.10.0+wasi-snapshot-preview1", "wasi 0.10.0+wasi-snapshot-preview1",
"winapi", "winapi 0.3.9",
]
[[package]]
name = "tokio"
version = "0.2.24"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "099837d3464c16a808060bb3f02263b412f6fafcb5d01c533d309985fbeebe48"
dependencies = [
"bytes 0.5.6",
"fnv",
"futures-core",
"lazy_static",
"libc",
"memchr",
"mio",
"mio-uds",
"num_cpus",
"pin-project-lite",
"slab",
"tokio-macros",
]
[[package]]
name = "tokio-macros"
version = "0.2.6"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "e44da00bfc73a25f814cd8d7e57a68a5c31b74b3152a0a1d1f590c97ed06265a"
dependencies = [
"proc-macro2 1.0.24",
"quote 1.0.7",
"syn 1.0.45",
]
[[package]]
name = "tokio-vsock"
version = "0.2.2"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "547a35667d4d842422da7f5528612321770f2f640e7fd5df0431de7b717fb2b4"
dependencies = [
"bytes 0.4.12",
"futures",
"iovec",
"libc",
"mio",
"nix 0.17.0",
"tokio",
"vsock",
] ]
[[package]] [[package]]
name = "ttrpc" name = "ttrpc"
version = "0.3.0" version = "0.4.13"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "bfa9da24c351f0feef5e66c0b28c18373a7ef3e1bfdfd5852170de494f9bf870" checksum = "d6e99ffa09e7fbe514b58b01bd17d71e3ed4dd27c588afa43d41ec0b7fc90b0a"
dependencies = [ dependencies = [
"async-trait",
"byteorder", "byteorder",
"futures",
"libc", "libc",
"log", "log",
"nix 0.16.1", "nix 0.16.1",
"protobuf", "protobuf",
"protobuf-codegen-pure", "protobuf-codegen-pure",
"thiserror",
"tokio",
"tokio-vsock",
] ]
[[package]] [[package]]
@ -1310,6 +1624,16 @@ version = "1.0.2"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d" checksum = "6a02e4885ed3bc0f2de90ea6dd45ebcbb66dacffe03547fadbb0eeae2770887d"
[[package]]
name = "vsock"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "dba063357047c0f2216c7c653879ea4e5e198d0c3cde7efa37ebfd9039b48491"
dependencies = [
"libc",
"nix 0.17.0",
]
[[package]] [[package]]
name = "wasi" name = "wasi"
version = "0.9.0+wasi-snapshot-preview1" version = "0.9.0+wasi-snapshot-preview1"
@ -1332,6 +1656,12 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "winapi"
version = "0.2.8"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "167dc9d6949a9b857f3451275e911c3f44255842c1f7a76f33c55103a909087a"
[[package]] [[package]]
name = "winapi" name = "winapi"
version = "0.3.9" version = "0.3.9"
@ -1342,6 +1672,12 @@ dependencies = [
"winapi-x86_64-pc-windows-gnu", "winapi-x86_64-pc-windows-gnu",
] ]
[[package]]
name = "winapi-build"
version = "0.1.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "2d315eee3b34aca4797b2da6b13ed88266e6d612562a0c46390af8299fc699bc"
[[package]] [[package]]
name = "winapi-i686-pc-windows-gnu" name = "winapi-i686-pc-windows-gnu"
version = "0.4.0" version = "0.4.0"
@ -1353,3 +1689,13 @@ name = "winapi-x86_64-pc-windows-gnu"
version = "0.4.0" version = "0.4.0"
source = "registry+https://github.com/rust-lang/crates.io-index" source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f"
[[package]]
name = "ws2_32-sys"
version = "0.2.1"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "d59cefebd0c892fa2dd6de581e937301d8552cb44489cdff035c6187cb63fa5e"
dependencies = [
"winapi 0.2.8",
"winapi-build",
]

View File

@ -11,7 +11,7 @@ rustjail = { path = "rustjail" }
protocols = { path = "protocols" } protocols = { path = "protocols" }
netlink = { path = "netlink", features = ["with-log", "with-agent-handler"] } netlink = { path = "netlink", features = ["with-log", "with-agent-handler"] }
lazy_static = "1.3.0" lazy_static = "1.3.0"
ttrpc = "0.3.0" ttrpc = { version="0.4.13", features=["async"] }
protobuf = "=2.14.0" protobuf = "=2.14.0"
libc = "0.2.58" libc = "0.2.58"
nix = "0.17.0" nix = "0.17.0"
@ -21,6 +21,8 @@ signal-hook = "0.1.9"
scan_fmt = "0.2.3" scan_fmt = "0.2.3"
scopeguard = "1.0.0" scopeguard = "1.0.0"
regex = "1" regex = "1"
tokio = { version="0.2", features = ["macros", "rt-threaded"] }
async-trait = "0.1.42"
# slog: # slog:
# - Dynamic keys required to allow HashMap keys to be slog::Serialized. # - Dynamic keys required to allow HashMap keys to be slog::Serialized.

View File

@ -5,9 +5,9 @@ authors = ["The Kata Containers community <kata-dev@lists.katacontainers.io>"]
edition = "2018" edition = "2018"
[dependencies] [dependencies]
ttrpc = "0.3.0" ttrpc = { version="0.4.13", features=["async"] }
async-trait = "0.1.42"
protobuf = "=2.14.0" protobuf = "=2.14.0"
futures = "0.1.27"
[build-dependencies] [build-dependencies]
ttrpc-codegen = "0.1.2" ttrpc-codegen = "0.1.2"

View File

@ -3,8 +3,8 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
// //
use std::fs::File; use std::fs;
use std::io::{Read, Write}; use ttrpc_codegen::{Codegen, Customize};
fn main() { fn main() {
let protos = vec![ let protos = vec![
@ -18,13 +18,17 @@ fn main() {
// Tell Cargo that if the .proto files changed, to rerun this build script. // Tell Cargo that if the .proto files changed, to rerun this build script.
protos protos
.iter() .iter()
.for_each(|p| println!("cargo:rerun-if-changed={}", &p)); .for_each(|p| println!("cargo:rerun-if-changed={}", p));
ttrpc_codegen::Codegen::new() Codegen::new()
.out_dir("src") .out_dir("src")
.inputs(&protos) .inputs(&protos)
.include("protos") .include("protos")
.rust_protobuf() .rust_protobuf()
.customize(Customize {
async_server: true,
..Default::default()
})
.run() .run()
.expect("Gen codes failed."); .expect("Gen codes failed.");
@ -40,16 +44,6 @@ fn main() {
} }
fn replace_text_in_file(file_name: &str, from: &str, to: &str) -> Result<(), std::io::Error> { fn replace_text_in_file(file_name: &str, from: &str, to: &str) -> Result<(), std::io::Error> {
let mut src = File::open(file_name)?; let new_contents = fs::read_to_string(file_name)?.replace(from, to);
let mut contents = String::new(); fs::write(&file_name, new_contents.as_bytes())
src.read_to_string(&mut contents).unwrap();
drop(src);
let new_contents = contents.replace(from, to);
let mut dst = File::create(&file_name)?;
dst.write_all(new_contents.as_bytes())?;
Ok(())
} }

View File

@ -100,7 +100,8 @@ fn announce(logger: &Logger, config: &agentConfig) {
); );
} }
fn main() -> Result<()> { #[tokio::main]
async fn main() -> Result<()> {
let args: Vec<String> = env::args().collect(); let args: Vec<String> = env::args().collect();
if args.len() == 2 && args[1] == "--version" { if args.len() == 2 && args[1] == "--version" {
@ -205,14 +206,14 @@ fn main() -> Result<()> {
_log_guard = Ok(slog_stdlog::init().map_err(|e| e)?); _log_guard = Ok(slog_stdlog::init().map_err(|e| e)?);
} }
start_sandbox(&logger, &config, init_mode)?; start_sandbox(&logger, &config, init_mode).await?;
let _ = log_handle.join(); let _ = log_handle.join();
Ok(()) Ok(())
} }
fn start_sandbox(logger: &Logger, config: &agentConfig, init_mode: bool) -> Result<()> { async fn start_sandbox(logger: &Logger, config: &agentConfig, init_mode: bool) -> Result<()> {
let shells = SHELLS.clone(); let shells = SHELLS.clone();
let debug_console_vport = config.debug_console_vport as u32; let debug_console_vport = config.debug_console_vport as u32;
@ -254,13 +255,13 @@ fn start_sandbox(logger: &Logger, config: &agentConfig, init_mode: bool) -> Resu
sandbox.lock().unwrap().sender = Some(tx); sandbox.lock().unwrap().sender = Some(tx);
// vsock:///dev/vsock, port // vsock:///dev/vsock, port
let mut server = rpc::start(sandbox, config.server_addr.as_str()); let mut server = rpc::start(sandbox, config.server_addr.as_str()).await?;
let _ = server.start().unwrap(); server.start().await?;
let _ = rx.recv()?; let _ = rx.recv()?;
server.shutdown(); server.shutdown().await?;
if let Some(handle) = shell_handle { if let Some(handle) = shell_handle {
handle.join().map_err(|e| anyhow!("{:?}", e))?; handle.join().map_err(|e| anyhow!("{:?}", e))?;

View File

@ -3,10 +3,15 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
// //
use async_trait::async_trait;
use std::path::Path; use std::path::Path;
use std::sync::mpsc::channel; use std::sync::mpsc::channel;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use ttrpc::{self, error::get_rpc_status as ttrpc_error}; use ttrpc::{
self,
error::get_rpc_status as ttrpc_error,
r#async::{Server as TtrpcServer, TtrpcContext},
};
use anyhow::{anyhow, Context, Result}; use anyhow::{anyhow, Context, Result};
use oci::{LinuxNamespace, Root, Spec}; use oci::{LinuxNamespace, Root, Spec};
@ -524,10 +529,11 @@ impl agentService {
} }
} }
#[async_trait]
impl protocols::agent_ttrpc::AgentService for agentService { impl protocols::agent_ttrpc::AgentService for agentService {
fn create_container( async fn create_container(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::CreateContainerRequest, req: protocols::agent::CreateContainerRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
match self.do_create_container(req) { match self.do_create_container(req) {
@ -536,9 +542,9 @@ impl protocols::agent_ttrpc::AgentService for agentService {
} }
} }
fn start_container( async fn start_container(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::StartContainerRequest, req: protocols::agent::StartContainerRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
match self.do_start_container(req) { match self.do_start_container(req) {
@ -547,9 +553,9 @@ impl protocols::agent_ttrpc::AgentService for agentService {
} }
} }
fn remove_container( async fn remove_container(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::RemoveContainerRequest, req: protocols::agent::RemoveContainerRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
match self.do_remove_container(req) { match self.do_remove_container(req) {
@ -558,9 +564,9 @@ impl protocols::agent_ttrpc::AgentService for agentService {
} }
} }
fn exec_process( async fn exec_process(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::ExecProcessRequest, req: protocols::agent::ExecProcessRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
match self.do_exec_process(req) { match self.do_exec_process(req) {
@ -569,9 +575,9 @@ impl protocols::agent_ttrpc::AgentService for agentService {
} }
} }
fn signal_process( async fn signal_process(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::SignalProcessRequest, req: protocols::agent::SignalProcessRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
match self.do_signal_process(req) { match self.do_signal_process(req) {
@ -580,18 +586,18 @@ impl protocols::agent_ttrpc::AgentService for agentService {
} }
} }
fn wait_process( async fn wait_process(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::WaitProcessRequest, req: protocols::agent::WaitProcessRequest,
) -> ttrpc::Result<WaitProcessResponse> { ) -> ttrpc::Result<WaitProcessResponse> {
self.do_wait_process(req) self.do_wait_process(req)
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())) .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))
} }
fn list_processes( async fn list_processes(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::ListProcessesRequest, req: protocols::agent::ListProcessesRequest,
) -> ttrpc::Result<ListProcessesResponse> { ) -> ttrpc::Result<ListProcessesResponse> {
let cid = req.container_id.clone(); let cid = req.container_id.clone();
@ -673,9 +679,9 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(resp) Ok(resp)
} }
fn update_container( async fn update_container(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::UpdateContainerRequest, req: protocols::agent::UpdateContainerRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
let cid = req.container_id.clone(); let cid = req.container_id.clone();
@ -707,9 +713,9 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(resp) Ok(resp)
} }
fn stats_container( async fn stats_container(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::StatsContainerRequest, req: protocols::agent::StatsContainerRequest,
) -> ttrpc::Result<StatsContainerResponse> { ) -> ttrpc::Result<StatsContainerResponse> {
let cid = req.container_id; let cid = req.container_id;
@ -727,9 +733,9 @@ impl protocols::agent_ttrpc::AgentService for agentService {
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())) .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))
} }
fn pause_container( async fn pause_container(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::PauseContainerRequest, req: protocols::agent::PauseContainerRequest,
) -> ttrpc::Result<protocols::empty::Empty> { ) -> ttrpc::Result<protocols::empty::Empty> {
let cid = req.get_container_id(); let cid = req.get_container_id();
@ -749,9 +755,9 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(Empty::new()) Ok(Empty::new())
} }
fn resume_container( async fn resume_container(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::ResumeContainerRequest, req: protocols::agent::ResumeContainerRequest,
) -> ttrpc::Result<protocols::empty::Empty> { ) -> ttrpc::Result<protocols::empty::Empty> {
let cid = req.get_container_id(); let cid = req.get_container_id();
@ -771,36 +777,36 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(Empty::new()) Ok(Empty::new())
} }
fn write_stdin( async fn write_stdin(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::WriteStreamRequest, req: protocols::agent::WriteStreamRequest,
) -> ttrpc::Result<WriteStreamResponse> { ) -> ttrpc::Result<WriteStreamResponse> {
self.do_write_stream(req) self.do_write_stream(req)
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())) .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))
} }
fn read_stdout( async fn read_stdout(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::ReadStreamRequest, req: protocols::agent::ReadStreamRequest,
) -> ttrpc::Result<ReadStreamResponse> { ) -> ttrpc::Result<ReadStreamResponse> {
self.do_read_stream(req, true) self.do_read_stream(req, true)
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())) .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))
} }
fn read_stderr( async fn read_stderr(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::ReadStreamRequest, req: protocols::agent::ReadStreamRequest,
) -> ttrpc::Result<ReadStreamResponse> { ) -> ttrpc::Result<ReadStreamResponse> {
self.do_read_stream(req, false) self.do_read_stream(req, false)
.map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string())) .map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))
} }
fn close_stdin( async fn close_stdin(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::CloseStdinRequest, req: protocols::agent::CloseStdinRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
let cid = req.container_id.clone(); let cid = req.container_id.clone();
@ -830,9 +836,9 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(Empty::new()) Ok(Empty::new())
} }
fn tty_win_resize( async fn tty_win_resize(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::TtyWinResizeRequest, req: protocols::agent::TtyWinResizeRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
let cid = req.container_id.clone(); let cid = req.container_id.clone();
@ -868,9 +874,9 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(Empty::new()) Ok(Empty::new())
} }
fn update_interface( async fn update_interface(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::UpdateInterfaceRequest, req: protocols::agent::UpdateInterfaceRequest,
) -> ttrpc::Result<Interface> { ) -> ttrpc::Result<Interface> {
if req.interface.is_none() { if req.interface.is_none() {
@ -899,9 +905,9 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(iface) Ok(iface)
} }
fn update_routes( async fn update_routes(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::UpdateRoutesRequest, req: protocols::agent::UpdateRoutesRequest,
) -> ttrpc::Result<Routes> { ) -> ttrpc::Result<Routes> {
let mut routes = protocols::agent::Routes::new(); let mut routes = protocols::agent::Routes::new();
@ -938,9 +944,9 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(routes) Ok(routes)
} }
fn list_interfaces( async fn list_interfaces(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
_req: protocols::agent::ListInterfacesRequest, _req: protocols::agent::ListInterfacesRequest,
) -> ttrpc::Result<Interfaces> { ) -> ttrpc::Result<Interfaces> {
let mut interface = protocols::agent::Interfaces::new(); let mut interface = protocols::agent::Interfaces::new();
@ -961,9 +967,9 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(interface) Ok(interface)
} }
fn list_routes( async fn list_routes(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
_req: protocols::agent::ListRoutesRequest, _req: protocols::agent::ListRoutesRequest,
) -> ttrpc::Result<Routes> { ) -> ttrpc::Result<Routes> {
let mut routes = protocols::agent::Routes::new(); let mut routes = protocols::agent::Routes::new();
@ -985,26 +991,26 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(routes) Ok(routes)
} }
fn start_tracing( async fn start_tracing(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::StartTracingRequest, req: protocols::agent::StartTracingRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
info!(sl!(), "start_tracing {:?}", req); info!(sl!(), "start_tracing {:?}", req);
Ok(Empty::new()) Ok(Empty::new())
} }
fn stop_tracing( async fn stop_tracing(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
_req: protocols::agent::StopTracingRequest, _req: protocols::agent::StopTracingRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
Ok(Empty::new()) Ok(Empty::new())
} }
fn create_sandbox( async fn create_sandbox(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::CreateSandboxRequest, req: protocols::agent::CreateSandboxRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
{ {
@ -1064,9 +1070,9 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(Empty::new()) Ok(Empty::new())
} }
fn destroy_sandbox( async fn destroy_sandbox(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
_req: protocols::agent::DestroySandboxRequest, _req: protocols::agent::DestroySandboxRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
let s = Arc::clone(&self.sandbox); let s = Arc::clone(&self.sandbox);
@ -1081,9 +1087,9 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(Empty::new()) Ok(Empty::new())
} }
fn add_arp_neighbors( async fn add_arp_neighbors(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::AddARPNeighborsRequest, req: protocols::agent::AddARPNeighborsRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
if req.neighbors.is_none() { if req.neighbors.is_none() {
@ -1110,9 +1116,9 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(Empty::new()) Ok(Empty::new())
} }
fn online_cpu_mem( async fn online_cpu_mem(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::OnlineCPUMemRequest, req: protocols::agent::OnlineCPUMemRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
let s = Arc::clone(&self.sandbox); let s = Arc::clone(&self.sandbox);
@ -1125,9 +1131,9 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(Empty::new()) Ok(Empty::new())
} }
fn reseed_random_dev( async fn reseed_random_dev(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::ReseedRandomDevRequest, req: protocols::agent::ReseedRandomDevRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
random::reseed_rng(req.data.as_slice()) random::reseed_rng(req.data.as_slice())
@ -1136,9 +1142,9 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(Empty::new()) Ok(Empty::new())
} }
fn get_guest_details( async fn get_guest_details(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::GuestDetailsRequest, req: protocols::agent::GuestDetailsRequest,
) -> ttrpc::Result<GuestDetailsResponse> { ) -> ttrpc::Result<GuestDetailsResponse> {
info!(sl!(), "get guest details!"); info!(sl!(), "get guest details!");
@ -1162,9 +1168,9 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(resp) Ok(resp)
} }
fn mem_hotplug_by_probe( async fn mem_hotplug_by_probe(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::MemHotplugByProbeRequest, req: protocols::agent::MemHotplugByProbeRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
do_mem_hotplug_by_probe(&req.memHotplugProbeAddr) do_mem_hotplug_by_probe(&req.memHotplugProbeAddr)
@ -1173,9 +1179,9 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(Empty::new()) Ok(Empty::new())
} }
fn set_guest_date_time( async fn set_guest_date_time(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::SetGuestDateTimeRequest, req: protocols::agent::SetGuestDateTimeRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
do_set_guest_date_time(req.Sec, req.Usec) do_set_guest_date_time(req.Sec, req.Usec)
@ -1184,9 +1190,9 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(Empty::new()) Ok(Empty::new())
} }
fn copy_file( async fn copy_file(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::CopyFileRequest, req: protocols::agent::CopyFileRequest,
) -> ttrpc::Result<Empty> { ) -> ttrpc::Result<Empty> {
do_copy_file(&req).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?; do_copy_file(&req).map_err(|e| ttrpc_error(ttrpc::Code::INTERNAL, e.to_string()))?;
@ -1194,9 +1200,9 @@ impl protocols::agent_ttrpc::AgentService for agentService {
Ok(Empty::new()) Ok(Empty::new())
} }
fn get_metrics( async fn get_metrics(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::agent::GetMetricsRequest, req: protocols::agent::GetMetricsRequest,
) -> ttrpc::Result<Metrics> { ) -> ttrpc::Result<Metrics> {
match get_metrics(&req) { match get_metrics(&req) {
@ -1209,9 +1215,9 @@ impl protocols::agent_ttrpc::AgentService for agentService {
} }
} }
fn get_oom_event( async fn get_oom_event(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
_req: protocols::agent::GetOOMEventRequest, _req: protocols::agent::GetOOMEventRequest,
) -> ttrpc::Result<OOMEvent> { ) -> ttrpc::Result<OOMEvent> {
let sandbox = self.sandbox.clone(); let sandbox = self.sandbox.clone();
@ -1235,10 +1241,12 @@ impl protocols::agent_ttrpc::AgentService for agentService {
#[derive(Clone)] #[derive(Clone)]
struct healthService; struct healthService;
#[async_trait]
impl protocols::health_ttrpc::Health for healthService { impl protocols::health_ttrpc::Health for healthService {
fn check( async fn check(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
_req: protocols::health::CheckRequest, _req: protocols::health::CheckRequest,
) -> ttrpc::Result<HealthCheckResponse> { ) -> ttrpc::Result<HealthCheckResponse> {
let mut resp = HealthCheckResponse::new(); let mut resp = HealthCheckResponse::new();
@ -1247,9 +1255,9 @@ impl protocols::health_ttrpc::Health for healthService {
Ok(resp) Ok(resp)
} }
fn version( async fn version(
&self, &self,
_ctx: &ttrpc::TtrpcContext, _ctx: &TtrpcContext,
req: protocols::health::CheckRequest, req: protocols::health::CheckRequest,
) -> ttrpc::Result<VersionCheckResponse> { ) -> ttrpc::Result<VersionCheckResponse> {
info!(sl!(), "version {:?}", req); info!(sl!(), "version {:?}", req);
@ -1376,7 +1384,7 @@ fn find_process<'a>(
ctr.get_process(eid).map_err(|_| anyhow!("Invalid exec id")) ctr.get_process(eid).map_err(|_| anyhow!("Invalid exec id"))
} }
pub fn start(s: Arc<Mutex<Sandbox>>, server_address: &str) -> ttrpc::Server { pub async fn start(s: Arc<Mutex<Sandbox>>, server_address: &str) -> Result<TtrpcServer> {
let agent_service = Box::new(agentService { sandbox: s }) let agent_service = Box::new(agentService { sandbox: s })
as Box<dyn protocols::agent_ttrpc::AgentService + Send + Sync>; as Box<dyn protocols::agent_ttrpc::AgentService + Send + Sync>;
@ -1390,15 +1398,14 @@ pub fn start(s: Arc<Mutex<Sandbox>>, server_address: &str) -> ttrpc::Server {
let hservice = protocols::health_ttrpc::create_health(health_worker); let hservice = protocols::health_ttrpc::create_health(health_worker);
let server = ttrpc::Server::new() let server = TtrpcServer::new()
.bind(server_address) .bind(server_address)?
.unwrap()
.register_service(aservice) .register_service(aservice)
.register_service(hservice); .register_service(hservice);
info!(sl!(), "ttRPC server started"; "address" => server_address); info!(sl!(), "ttRPC server started"; "address" => server_address);
server Ok(server)
} }
// This function updates the container namespaces configuration based on the // This function updates the container namespaces configuration based on the
@ -1679,23 +1686,13 @@ mod tests {
use super::*; use super::*;
use crate::protocols::agent_ttrpc::AgentService; use crate::protocols::agent_ttrpc::AgentService;
use oci::{Hook, Hooks}; use oci::{Hook, Hooks};
use std::sync::mpsc::{Receiver, Sender}; use ttrpc::{r#async::TtrpcContext, MessageHeader};
use ttrpc::{MessageHeader, TtrpcContext};
type Message = (MessageHeader, Vec<u8>); fn mk_ttrpc_context() -> TtrpcContext {
TtrpcContext {
fn mk_ttrpc_context() -> (TtrpcContext, Receiver<Message>) {
let mh = MessageHeader::default();
let (tx, rx): (Sender<Message>, Receiver<Message>) = channel();
let ctx = TtrpcContext {
fd: -1, fd: -1,
mh, mh: MessageHeader::default(),
res_tx: tx, }
};
(ctx, rx)
} }
#[test] #[test]
@ -1737,8 +1734,8 @@ mod tests {
assert_eq!(s.hooks, oci.hooks); assert_eq!(s.hooks, oci.hooks);
} }
#[test] #[tokio::test]
fn test_update_interface() { async fn test_update_interface() {
let logger = slog::Logger::root(slog::Discard, o!()); let logger = slog::Logger::root(slog::Discard, o!());
let sandbox = Sandbox::new(&logger).unwrap(); let sandbox = Sandbox::new(&logger).unwrap();
@ -1747,15 +1744,15 @@ mod tests {
}); });
let req = protocols::agent::UpdateInterfaceRequest::default(); let req = protocols::agent::UpdateInterfaceRequest::default();
let (ctx, _) = mk_ttrpc_context(); let ctx = mk_ttrpc_context();
let result = agent_service.update_interface(&ctx, req); let result = agent_service.update_interface(&ctx, req).await;
assert!(result.is_err(), "expected update interface to fail"); assert!(result.is_err(), "expected update interface to fail");
} }
#[test] #[tokio::test]
fn test_update_routes() { async fn test_update_routes() {
let logger = slog::Logger::root(slog::Discard, o!()); let logger = slog::Logger::root(slog::Discard, o!());
let sandbox = Sandbox::new(&logger).unwrap(); let sandbox = Sandbox::new(&logger).unwrap();
@ -1764,15 +1761,15 @@ mod tests {
}); });
let req = protocols::agent::UpdateRoutesRequest::default(); let req = protocols::agent::UpdateRoutesRequest::default();
let (ctx, _) = mk_ttrpc_context(); let ctx = mk_ttrpc_context();
let result = agent_service.update_routes(&ctx, req); let result = agent_service.update_routes(&ctx, req).await;
assert!(result.is_err(), "expected update routes to fail"); assert!(result.is_err(), "expected update routes to fail");
} }
#[test] #[tokio::test]
fn test_add_arp_neighbors() { async fn test_add_arp_neighbors() {
let logger = slog::Logger::root(slog::Discard, o!()); let logger = slog::Logger::root(slog::Discard, o!());
let sandbox = Sandbox::new(&logger).unwrap(); let sandbox = Sandbox::new(&logger).unwrap();
@ -1781,9 +1778,9 @@ mod tests {
}); });
let req = protocols::agent::AddARPNeighborsRequest::default(); let req = protocols::agent::AddARPNeighborsRequest::default();
let (ctx, _) = mk_ttrpc_context(); let ctx = mk_ttrpc_context();
let result = agent_service.add_arp_neighbors(&ctx, req); let result = agent_service.add_arp_neighbors(&ctx, req).await;
assert!(result.is_err(), "expected add arp neighbors to fail"); assert!(result.is_err(), "expected add arp neighbors to fail");
} }