runtime-rs: make send_message() as an method of ServiceManager

Simplify implementation by making send_message() as an method of
ServiceManager.

Fixes: #7479

Signed-off-by: Jiang Liu <gerry@linux.alibaba.com>
This commit is contained in:
Jiang Liu 2023-07-28 14:27:30 +08:00
parent 1cc1c81c9a
commit 458e1bc712

View File

@ -53,47 +53,6 @@ impl std::fmt::Debug for ServiceManager {
} }
} }
async fn send_event(
containerd_binary: String,
address: String,
namespace: String,
event: Arc<dyn Event>,
) -> Result<()> {
let any = Any {
type_url: event.type_url(),
value: event.value().context("get event value")?,
..Default::default()
};
let data = any.write_to_bytes().context("write to any")?;
let mut child = Command::new(containerd_binary)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.args([
"--address",
&address,
"publish",
"--topic",
&event.r#type(),
"--namespace",
&namespace,
])
.spawn()
.context("spawn containerd cmd to publish event")?;
let stdin = child.stdin.as_mut().context("failed to open stdin")?;
stdin
.write_all(&data)
.await
.context("failed to write to stdin")?;
let output = child
.wait_with_output()
.await
.context("failed to read stdout")?;
info!(sl!(), "get output: {:?}", output);
Ok(())
}
impl ServiceManager { impl ServiceManager {
// TODO: who manages lifecycle for `task_server_fd`? // TODO: who manages lifecycle for `task_server_fd`?
pub async fn new( pub async fn new(
@ -137,14 +96,7 @@ impl ServiceManager {
} }
Action::Event(event) => { Action::Event(event) => {
info!(sl!(), "get event {:?}", &event); info!(sl!(), "get event {:?}", &event);
send_event( self.send_event(event).await.context("send event")
self.binary.clone(),
self.address.clone(),
self.namespace.clone(),
event,
)
.await
.context("send event")
} }
}; };
@ -202,4 +154,40 @@ impl ServiceManager {
} }
Ok(()) Ok(())
} }
async fn send_event(&self, event: Arc<dyn Event>) -> Result<()> {
let any = Any {
type_url: event.type_url(),
value: event.value().context("get event value")?,
..Default::default()
};
let data = any.write_to_bytes().context("write to any")?;
let mut child = Command::new(&self.binary)
.stdin(Stdio::piped())
.stdout(Stdio::piped())
.stderr(Stdio::piped())
.args([
"--address",
&self.address,
"publish",
"--topic",
&event.r#type(),
"--namespace",
&self.namespace,
])
.spawn()
.context("spawn containerd cmd to publish event")?;
let stdin = child.stdin.as_mut().context("failed to open stdin")?;
stdin
.write_all(&data)
.await
.context("failed to write to stdin")?;
let output = child
.wait_with_output()
.await
.context("failed to read stdout")?;
info!(sl!(), "get output: {:?}", output);
Ok(())
}
} }