mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-06-23 14:08:31 +00:00
runtime-rs: make send_message() as an method of ServiceManager
Simplify implementation by making send_message() as an method of ServiceManager. Fixes: #7479 Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
This commit is contained in:
parent
1cc1c81c9a
commit
458e1bc712
@ -53,47 +53,6 @@ impl std::fmt::Debug for ServiceManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn send_event(
|
|
||||||
containerd_binary: String,
|
|
||||||
address: String,
|
|
||||||
namespace: String,
|
|
||||||
event: Arc<dyn Event>,
|
|
||||||
) -> Result<()> {
|
|
||||||
let any = Any {
|
|
||||||
type_url: event.type_url(),
|
|
||||||
value: event.value().context("get event value")?,
|
|
||||||
..Default::default()
|
|
||||||
};
|
|
||||||
let data = any.write_to_bytes().context("write to any")?;
|
|
||||||
let mut child = Command::new(containerd_binary)
|
|
||||||
.stdin(Stdio::piped())
|
|
||||||
.stdout(Stdio::piped())
|
|
||||||
.stderr(Stdio::piped())
|
|
||||||
.args([
|
|
||||||
"--address",
|
|
||||||
&address,
|
|
||||||
"publish",
|
|
||||||
"--topic",
|
|
||||||
&event.r#type(),
|
|
||||||
"--namespace",
|
|
||||||
&namespace,
|
|
||||||
])
|
|
||||||
.spawn()
|
|
||||||
.context("spawn containerd cmd to publish event")?;
|
|
||||||
|
|
||||||
let stdin = child.stdin.as_mut().context("failed to open stdin")?;
|
|
||||||
stdin
|
|
||||||
.write_all(&data)
|
|
||||||
.await
|
|
||||||
.context("failed to write to stdin")?;
|
|
||||||
let output = child
|
|
||||||
.wait_with_output()
|
|
||||||
.await
|
|
||||||
.context("failed to read stdout")?;
|
|
||||||
info!(sl!(), "get output: {:?}", output);
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
impl ServiceManager {
|
impl ServiceManager {
|
||||||
// TODO: who manages lifecycle for `task_server_fd`?
|
// TODO: who manages lifecycle for `task_server_fd`?
|
||||||
pub async fn new(
|
pub async fn new(
|
||||||
@ -137,14 +96,7 @@ impl ServiceManager {
|
|||||||
}
|
}
|
||||||
Action::Event(event) => {
|
Action::Event(event) => {
|
||||||
info!(sl!(), "get event {:?}", &event);
|
info!(sl!(), "get event {:?}", &event);
|
||||||
send_event(
|
self.send_event(event).await.context("send event")
|
||||||
self.binary.clone(),
|
|
||||||
self.address.clone(),
|
|
||||||
self.namespace.clone(),
|
|
||||||
event,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
.context("send event")
|
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
@ -202,4 +154,40 @@ impl ServiceManager {
|
|||||||
}
|
}
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn send_event(&self, event: Arc<dyn Event>) -> Result<()> {
|
||||||
|
let any = Any {
|
||||||
|
type_url: event.type_url(),
|
||||||
|
value: event.value().context("get event value")?,
|
||||||
|
..Default::default()
|
||||||
|
};
|
||||||
|
let data = any.write_to_bytes().context("write to any")?;
|
||||||
|
let mut child = Command::new(&self.binary)
|
||||||
|
.stdin(Stdio::piped())
|
||||||
|
.stdout(Stdio::piped())
|
||||||
|
.stderr(Stdio::piped())
|
||||||
|
.args([
|
||||||
|
"--address",
|
||||||
|
&self.address,
|
||||||
|
"publish",
|
||||||
|
"--topic",
|
||||||
|
&event.r#type(),
|
||||||
|
"--namespace",
|
||||||
|
&self.namespace,
|
||||||
|
])
|
||||||
|
.spawn()
|
||||||
|
.context("spawn containerd cmd to publish event")?;
|
||||||
|
|
||||||
|
let stdin = child.stdin.as_mut().context("failed to open stdin")?;
|
||||||
|
stdin
|
||||||
|
.write_all(&data)
|
||||||
|
.await
|
||||||
|
.context("failed to write to stdin")?;
|
||||||
|
let output = child
|
||||||
|
.wait_with_output()
|
||||||
|
.await
|
||||||
|
.context("failed to read stdout")?;
|
||||||
|
info!(sl!(), "get output: {:?}", output);
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user