mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-04-30 04:34:27 +00:00
agent: async the debug console
Make the debug console in this commit. Finish the rework of debug console. Fixes: #1647 Signed-off-by: Tim Zhang <tim@hyper.sh>
This commit is contained in:
parent
9017e1100b
commit
790332575b
@ -4,10 +4,9 @@
|
|||||||
// SPDX-License-Identifier: Apache-2.0
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
//
|
//
|
||||||
|
|
||||||
|
use crate::util;
|
||||||
use anyhow::{anyhow, Result};
|
use anyhow::{anyhow, Result};
|
||||||
use lazy_static;
|
use nix::fcntl::{self, FcntlArg, FdFlag, OFlag};
|
||||||
use nix::fcntl::{self, OFlag};
|
|
||||||
use nix::fcntl::{FcntlArg, FdFlag};
|
|
||||||
use nix::libc::{STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO};
|
use nix::libc::{STDERR_FILENO, STDIN_FILENO, STDOUT_FILENO};
|
||||||
use nix::pty::{openpty, OpenptyResult};
|
use nix::pty::{openpty, OpenptyResult};
|
||||||
use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType};
|
use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType};
|
||||||
@ -16,18 +15,19 @@ use nix::sys::wait;
|
|||||||
use nix::unistd::{self, close, dup2, fork, setsid, ForkResult, Pid};
|
use nix::unistd::{self, close, dup2, fork, setsid, ForkResult, Pid};
|
||||||
use rustjail::pipestream::PipeStream;
|
use rustjail::pipestream::PipeStream;
|
||||||
use slog::Logger;
|
use slog::Logger;
|
||||||
use std::ffi::{CStr, CString};
|
use std::ffi::CString;
|
||||||
use std::fs::File;
|
|
||||||
use std::io::{Read, Write};
|
|
||||||
use std::os::unix::io::{FromRawFd, RawFd};
|
use std::os::unix::io::{FromRawFd, RawFd};
|
||||||
use std::path::PathBuf;
|
use std::path::PathBuf;
|
||||||
|
use std::process::Stdio;
|
||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use std::sync::Mutex as SyncMutex;
|
use std::sync::Mutex as SyncMutex;
|
||||||
|
|
||||||
|
use futures::StreamExt;
|
||||||
|
use tokio::io::{AsyncRead, AsyncWrite};
|
||||||
|
use tokio::select;
|
||||||
use tokio::sync::watch::Receiver;
|
use tokio::sync::watch::Receiver;
|
||||||
use tokio::{pin, select};
|
|
||||||
|
|
||||||
const CONSOLE_PATH: &str = "/dev/console";
|
const CONSOLE_PATH: &str = "/dev/console";
|
||||||
const DEFAULT_BUF_SIZE: usize = 8 * 1024;
|
|
||||||
|
|
||||||
lazy_static! {
|
lazy_static! {
|
||||||
static ref SHELLS: Arc<SyncMutex<Vec<String>>> = {
|
static ref SHELLS: Arc<SyncMutex<Vec<String>>> = {
|
||||||
@ -53,16 +53,13 @@ pub async fn debug_console_handler(
|
|||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let logger = logger.new(o!("subsystem" => "debug-console"));
|
let logger = logger.new(o!("subsystem" => "debug-console"));
|
||||||
|
|
||||||
let shells = SHELLS.clone();
|
let shells = SHELLS.lock().unwrap().to_vec();
|
||||||
let shells = shells.lock().unwrap().to_vec();
|
|
||||||
|
|
||||||
let shell = shells
|
let shell = shells
|
||||||
.iter()
|
.into_iter()
|
||||||
.find(|sh| PathBuf::from(sh).exists())
|
.find(|sh| PathBuf::from(sh).exists())
|
||||||
.ok_or_else(|| anyhow!("no shell found to launch debug console"))?;
|
.ok_or_else(|| anyhow!("no shell found to launch debug console"))?;
|
||||||
|
|
||||||
let fd: RawFd;
|
|
||||||
|
|
||||||
if port > 0 {
|
if port > 0 {
|
||||||
let listenfd = socket::socket(
|
let listenfd = socket::socket(
|
||||||
AddressFamily::Vsock,
|
AddressFamily::Vsock,
|
||||||
@ -74,40 +71,66 @@ pub async fn debug_console_handler(
|
|||||||
socket::bind(listenfd, &addr)?;
|
socket::bind(listenfd, &addr)?;
|
||||||
socket::listen(listenfd, 1)?;
|
socket::listen(listenfd, 1)?;
|
||||||
|
|
||||||
fd = socket::accept4(listenfd, SockFlag::SOCK_CLOEXEC)?;
|
let mut incoming = util::get_vsock_incoming(listenfd);
|
||||||
|
|
||||||
|
loop {
|
||||||
|
select! {
|
||||||
|
_ = shutdown.changed() => {
|
||||||
|
info!(logger, "debug console got shutdown request");
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
conn = incoming.next() => {
|
||||||
|
if let Some(conn) = conn {
|
||||||
|
// Accept a new connection
|
||||||
|
match conn {
|
||||||
|
Ok(stream) => {
|
||||||
|
let logger = logger.clone();
|
||||||
|
let shell = shell.clone();
|
||||||
|
// Do not block(await) here, or we'll never receive the shutdown signal
|
||||||
|
tokio::spawn(async move {
|
||||||
|
let _ = run_debug_console_vsock(logger, shell, stream).await;
|
||||||
|
});
|
||||||
|
}
|
||||||
|
Err(e) => {
|
||||||
|
error!(logger, "{:?}", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
} else {
|
} else {
|
||||||
let mut flags = OFlag::empty();
|
let mut flags = OFlag::empty();
|
||||||
flags.insert(OFlag::O_RDWR);
|
flags.insert(OFlag::O_RDWR);
|
||||||
flags.insert(OFlag::O_CLOEXEC);
|
flags.insert(OFlag::O_CLOEXEC);
|
||||||
|
|
||||||
fd = fcntl::open(CONSOLE_PATH, flags, Mode::empty())?;
|
let fd = fcntl::open(CONSOLE_PATH, flags, Mode::empty())?;
|
||||||
};
|
|
||||||
|
|
||||||
loop {
|
|
||||||
select! {
|
select! {
|
||||||
_ = shutdown.changed() => {
|
_ = shutdown.changed() => {
|
||||||
info!(logger, "got shutdown request");
|
info!(logger, "debug console got shutdown request");
|
||||||
break;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// BUG: FIXME: wait on parent.
|
result = run_debug_console_serial(shell.clone(), fd) => {
|
||||||
//result = run_debug_console_shell(logger.clone(), shell, fd, shutdown.clone()) => {
|
match result {
|
||||||
// match result {
|
Ok(_) => {
|
||||||
// Ok(_) => {
|
info!(logger, "run_debug_console_shell session finished");
|
||||||
// info!(logger, "run_debug_console_shell session finished");
|
}
|
||||||
// }
|
Err(err) => {
|
||||||
// Err(err) => {
|
error!(logger, "run_debug_console_shell failed: {:?}", err);
|
||||||
// error!(logger, "run_debug_console_shell failed: {:?}", err);
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
};
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
fn run_in_child(slave_fd: libc::c_int, shell: &str) -> Result<()> {
|
fn run_in_child(slave_fd: libc::c_int, shell: String) -> Result<()> {
|
||||||
// create new session with child as session leader
|
// create new session with child as session leader
|
||||||
setsid()?;
|
setsid()?;
|
||||||
|
|
||||||
@ -122,10 +145,9 @@ fn run_in_child(slave_fd: libc::c_int, shell: &str) -> Result<()> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
let cmd = CString::new(shell).unwrap();
|
let cmd = CString::new(shell).unwrap();
|
||||||
let args: Vec<&CStr> = vec![];
|
|
||||||
|
|
||||||
// run shell
|
// run shell
|
||||||
let _ = unistd::execvp(cmd.as_c_str(), args.as_slice()).map_err(|e| match e {
|
let _ = unistd::execvp(cmd.as_c_str(), &[]).map_err(|e| match e {
|
||||||
nix::Error::Sys(errno) => {
|
nix::Error::Sys(errno) => {
|
||||||
std::process::exit(errno as i32);
|
std::process::exit(errno as i32);
|
||||||
}
|
}
|
||||||
@ -135,122 +157,45 @@ fn run_in_child(slave_fd: libc::c_int, shell: &str) -> Result<()> {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run_in_parent(
|
async fn run_in_parent<T: AsyncRead + AsyncWrite>(
|
||||||
logger: Logger,
|
logger: Logger,
|
||||||
mut shutdown: Receiver<bool>,
|
stream: T,
|
||||||
socket_fd: RawFd,
|
|
||||||
pseudo: OpenptyResult,
|
pseudo: OpenptyResult,
|
||||||
child_pid: Pid,
|
child_pid: Pid,
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
info!(logger, "get debug shell pid {:?}", child_pid);
|
info!(logger, "get debug shell pid {:?}", child_pid);
|
||||||
|
|
||||||
let (rfd, wfd) = unistd::pipe2(OFlag::O_CLOEXEC)?;
|
|
||||||
let master_fd = pseudo.master;
|
let master_fd = pseudo.master;
|
||||||
let slave_fd = pseudo.slave;
|
let _ = close(pseudo.slave);
|
||||||
//let debug_shell_logger = logger.clone();
|
|
||||||
|
|
||||||
let logger = logger.clone();
|
let (mut socket_reader, mut socket_writer) = tokio::io::split(stream);
|
||||||
|
let (mut master_reader, mut master_writer) = tokio::io::split(PipeStream::from_fd(master_fd));
|
||||||
|
|
||||||
// channel that used to sync between thread and main process
|
|
||||||
let (tx, rx) = std::sync::mpsc::channel::<i32>();
|
|
||||||
|
|
||||||
// start a thread to do IO copy between socket and pseudo.master
|
|
||||||
//tokio::spawn(async move {
|
|
||||||
//let logger = logger.clone();
|
|
||||||
//let mut shutdown = shutdown.clone();
|
|
||||||
|
|
||||||
//let mut master_reader = unsafe { File::from_raw_fd(master_fd) };
|
|
||||||
//let mut master_writer = unsafe { File::from_raw_fd(master_fd) };
|
|
||||||
//let mut socket_reader = unsafe { File::from_raw_fd(socket_fd) };
|
|
||||||
//let mut socket_writer = unsafe { File::from_raw_fd(socket_fd) };
|
|
||||||
|
|
||||||
let mut pipe_reader = PipeStream::from_fd(rfd);
|
|
||||||
|
|
||||||
//let mut pty_master_reader = PipeStream::from_fd(master_fd);
|
|
||||||
//let mut socket_reader = PipeStream::from_fd(socket_fd);
|
|
||||||
|
|
||||||
//pin!(pipe_reader);
|
|
||||||
|
|
||||||
// BUG: FIXME: add blocks for pipe_reader, master_fd and socket_fd
|
|
||||||
// (see commented out code below).
|
|
||||||
loop {
|
|
||||||
select! {
|
select! {
|
||||||
_ = shutdown.changed() => {
|
res = tokio::io::copy(&mut master_reader, &mut socket_writer) => {
|
||||||
info!(logger, "got shutdown request");
|
debug!(
|
||||||
break;
|
logger,
|
||||||
},
|
"master closed: {:?}", res
|
||||||
_ = pipe_reader => {
|
);
|
||||||
info!(
|
}
|
||||||
debug_shell_logger,
|
res = tokio::io::copy(&mut socket_reader, &mut master_writer) => {
|
||||||
"debug shell process {} exited", child_pid
|
info!(
|
||||||
|
logger,
|
||||||
|
"socket closed: {:?}", res
|
||||||
);
|
);
|
||||||
tx.send(1).unwrap();
|
|
||||||
},
|
|
||||||
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// if fd_set.contains(master_fd) {
|
|
||||||
// match io_copy(&mut master_reader, &mut socket_writer) {
|
|
||||||
// Ok(0) => {
|
|
||||||
// debug!(debug_shell_logger, "master fd closed");
|
|
||||||
// tx.send(1).unwrap();
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// Ok(_) => {}
|
|
||||||
// Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
|
|
||||||
// Err(e) => {
|
|
||||||
// error!(debug_shell_logger, "read master fd error {:?}", e);
|
|
||||||
// tx.send(1).unwrap();
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
|
|
||||||
// if fd_set.contains(socket_fd) {
|
|
||||||
// match io_copy(&mut socket_reader, &mut master_writer) {
|
|
||||||
// Ok(0) => {
|
|
||||||
// debug!(debug_shell_logger, "socket fd closed");
|
|
||||||
// tx.send(1).unwrap();
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// Ok(_) => {}
|
|
||||||
// Err(ref e) if e.kind() == std::io::ErrorKind::Interrupted => continue,
|
|
||||||
// Err(e) => {
|
|
||||||
// error!(debug_shell_logger, "read socket fd error {:?}", e);
|
|
||||||
// tx.send(1).unwrap();
|
|
||||||
// break;
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
// }
|
|
||||||
//}
|
|
||||||
//})
|
|
||||||
//.await;
|
|
||||||
|
|
||||||
let wait_status = wait::waitpid(child_pid, None);
|
let wait_status = wait::waitpid(child_pid, None);
|
||||||
info!(logger, "debug console process exit code: {:?}", wait_status);
|
info!(logger, "debug console process exit code: {:?}", wait_status);
|
||||||
|
|
||||||
info!(logger, "notify debug monitor thread to exit");
|
|
||||||
// close pipe to exit select loop
|
|
||||||
let _ = close(wfd);
|
|
||||||
|
|
||||||
// wait for thread exit.
|
|
||||||
let _ = rx.recv().unwrap();
|
|
||||||
info!(logger, "debug monitor thread has exited");
|
|
||||||
|
|
||||||
// close files
|
|
||||||
let _ = close(rfd);
|
|
||||||
let _ = close(master_fd);
|
|
||||||
let _ = close(slave_fd);
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
async fn run_debug_console_shell(
|
async fn run_debug_console_vsock<T: AsyncRead + AsyncWrite>(
|
||||||
logger: Logger,
|
logger: Logger,
|
||||||
shell: &str,
|
shell: String,
|
||||||
socket_fd: RawFd,
|
stream: T,
|
||||||
shutdown: Receiver<bool>,
|
|
||||||
) -> Result<()> {
|
) -> Result<()> {
|
||||||
let logger = logger.new(o!("subsystem" => "debug-console-shell"));
|
let logger = logger.new(o!("subsystem" => "debug-console-shell"));
|
||||||
|
|
||||||
@ -263,19 +208,30 @@ async fn run_debug_console_shell(
|
|||||||
match fork() {
|
match fork() {
|
||||||
Ok(ForkResult::Child) => run_in_child(slave_fd, shell),
|
Ok(ForkResult::Child) => run_in_child(slave_fd, shell),
|
||||||
Ok(ForkResult::Parent { child: child_pid }) => {
|
Ok(ForkResult::Parent { child: child_pid }) => {
|
||||||
run_in_parent(
|
run_in_parent(logger.clone(), stream, pseudo, child_pid).await
|
||||||
logger.clone(),
|
|
||||||
shutdown.clone(),
|
|
||||||
socket_fd,
|
|
||||||
pseudo,
|
|
||||||
child_pid,
|
|
||||||
)
|
|
||||||
.await
|
|
||||||
}
|
}
|
||||||
Err(err) => Err(anyhow!("fork error: {:?}", err)),
|
Err(err) => Err(anyhow!("fork error: {:?}", err)),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
async fn run_debug_console_serial(shell: String, fd: RawFd) -> Result<()> {
|
||||||
|
let mut child = match tokio::process::Command::new(shell)
|
||||||
|
.arg("-i")
|
||||||
|
.kill_on_drop(true)
|
||||||
|
.stdin(unsafe { Stdio::from_raw_fd(fd) })
|
||||||
|
.stdout(unsafe { Stdio::from_raw_fd(fd) })
|
||||||
|
.stderr(unsafe { Stdio::from_raw_fd(fd) })
|
||||||
|
.spawn()
|
||||||
|
{
|
||||||
|
Ok(c) => c,
|
||||||
|
Err(_) => return Err(anyhow!("failed to spawn shell")),
|
||||||
|
};
|
||||||
|
|
||||||
|
child.wait().await?;
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
// BUG: FIXME:
|
// BUG: FIXME:
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
@ -328,26 +284,3 @@ mod tests {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// BUG: FIXME: should not be required as we can use the
|
|
||||||
// interruptable_io_copier(). But if it is still needed, move to utils.rs.
|
|
||||||
fn io_copy<R: ?Sized, W: ?Sized>(reader: &mut R, writer: &mut W) -> std::io::Result<u64>
|
|
||||||
where
|
|
||||||
R: Read,
|
|
||||||
W: Write,
|
|
||||||
{
|
|
||||||
let mut buf = [0; DEFAULT_BUF_SIZE];
|
|
||||||
let buf_len;
|
|
||||||
|
|
||||||
match reader.read(&mut buf) {
|
|
||||||
Ok(0) => return Ok(0),
|
|
||||||
Ok(len) => buf_len = len,
|
|
||||||
Err(err) => return Err(err),
|
|
||||||
};
|
|
||||||
|
|
||||||
// write and return
|
|
||||||
match writer.write_all(&buf[..buf_len]) {
|
|
||||||
Ok(_) => Ok(buf_len as u64),
|
|
||||||
Err(err) => Err(err),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
@ -20,11 +20,9 @@ extern crate scopeguard;
|
|||||||
extern crate slog;
|
extern crate slog;
|
||||||
|
|
||||||
use anyhow::{anyhow, Context, Result};
|
use anyhow::{anyhow, Context, Result};
|
||||||
use nix::fcntl::{self, OFlag};
|
use nix::fcntl::OFlag;
|
||||||
use nix::fcntl::{FcntlArg, FdFlag};
|
|
||||||
use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType};
|
use nix::sys::socket::{self, AddressFamily, SockAddr, SockFlag, SockType};
|
||||||
use nix::unistd::{self, dup, Pid};
|
use nix::unistd::{self, dup, Pid};
|
||||||
use std::collections::HashMap;
|
|
||||||
use std::env;
|
use std::env;
|
||||||
use std::ffi::OsStr;
|
use std::ffi::OsStr;
|
||||||
use std::fs::{self, File};
|
use std::fs::{self, File};
|
||||||
@ -61,7 +59,6 @@ use slog::Logger;
|
|||||||
use uevent::watch_uevents;
|
use uevent::watch_uevents;
|
||||||
|
|
||||||
use futures::future::join_all;
|
use futures::future::join_all;
|
||||||
use futures::StreamExt as _;
|
|
||||||
use rustjail::pipestream::PipeStream;
|
use rustjail::pipestream::PipeStream;
|
||||||
use tokio::{
|
use tokio::{
|
||||||
io::AsyncWrite,
|
io::AsyncWrite,
|
||||||
@ -71,7 +68,6 @@ use tokio::{
|
|||||||
},
|
},
|
||||||
task::JoinHandle,
|
task::JoinHandle,
|
||||||
};
|
};
|
||||||
use tokio_vsock::{Incoming, VsockListener, VsockStream};
|
|
||||||
|
|
||||||
mod rpc;
|
mod rpc;
|
||||||
|
|
||||||
@ -96,27 +92,6 @@ fn announce(logger: &Logger, config: &AgentConfig) {
|
|||||||
);
|
);
|
||||||
}
|
}
|
||||||
|
|
||||||
fn set_fd_close_exec(fd: RawFd) -> Result<RawFd> {
|
|
||||||
if let Err(e) = fcntl::fcntl(fd, FcntlArg::F_SETFD(FdFlag::FD_CLOEXEC)) {
|
|
||||||
return Err(anyhow!("failed to set fd: {} as close-on-exec: {}", fd, e));
|
|
||||||
}
|
|
||||||
Ok(fd)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn get_vsock_incoming(fd: RawFd) -> Incoming {
|
|
||||||
let incoming;
|
|
||||||
unsafe {
|
|
||||||
incoming = VsockListener::from_raw_fd(fd).incoming();
|
|
||||||
}
|
|
||||||
incoming
|
|
||||||
}
|
|
||||||
|
|
||||||
async fn get_vsock_stream(fd: RawFd) -> Result<VsockStream> {
|
|
||||||
let stream = get_vsock_incoming(fd).next().await.unwrap().unwrap();
|
|
||||||
set_fd_close_exec(stream.as_raw_fd())?;
|
|
||||||
Ok(stream)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Create a thread to handle reading from the logger pipe. The thread will
|
// Create a thread to handle reading from the logger pipe. The thread will
|
||||||
// output to the vsock port specified, or stdout.
|
// output to the vsock port specified, or stdout.
|
||||||
async fn create_logger_task(rfd: RawFd, vsock_port: u32, shutdown: Receiver<bool>) -> Result<()> {
|
async fn create_logger_task(rfd: RawFd, vsock_port: u32, shutdown: Receiver<bool>) -> Result<()> {
|
||||||
@ -135,7 +110,7 @@ async fn create_logger_task(rfd: RawFd, vsock_port: u32, shutdown: Receiver<bool
|
|||||||
socket::bind(listenfd, &addr).unwrap();
|
socket::bind(listenfd, &addr).unwrap();
|
||||||
socket::listen(listenfd, 1).unwrap();
|
socket::listen(listenfd, 1).unwrap();
|
||||||
|
|
||||||
writer = Box::new(get_vsock_stream(listenfd).await.unwrap());
|
writer = Box::new(util::get_vsock_stream(listenfd).await.unwrap());
|
||||||
} else {
|
} else {
|
||||||
writer = Box::new(tokio::io::stdout());
|
writer = Box::new(tokio::io::stdout());
|
||||||
}
|
}
|
||||||
|
@ -3,10 +3,14 @@
|
|||||||
// SPDX-License-Identifier: Apache-2.0
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
//
|
//
|
||||||
|
|
||||||
|
use anyhow::Result;
|
||||||
|
use futures::StreamExt;
|
||||||
use std::io;
|
use std::io;
|
||||||
use std::io::ErrorKind;
|
use std::io::ErrorKind;
|
||||||
|
use std::os::unix::io::{FromRawFd, RawFd};
|
||||||
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
use tokio::io::{AsyncReadExt, AsyncWriteExt};
|
||||||
use tokio::sync::watch::Receiver;
|
use tokio::sync::watch::Receiver;
|
||||||
|
use tokio_vsock::{Incoming, VsockListener, VsockStream};
|
||||||
|
|
||||||
// Size of I/O read buffer
|
// Size of I/O read buffer
|
||||||
const BUF_SIZE: usize = 8192;
|
const BUF_SIZE: usize = 8192;
|
||||||
@ -52,6 +56,15 @@ where
|
|||||||
Ok(total_bytes)
|
Ok(total_bytes)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn get_vsock_incoming(fd: RawFd) -> Incoming {
|
||||||
|
unsafe { VsockListener::from_raw_fd(fd).incoming() }
|
||||||
|
}
|
||||||
|
|
||||||
|
pub async fn get_vsock_stream(fd: RawFd) -> Result<VsockStream> {
|
||||||
|
let stream = get_vsock_incoming(fd).next().await.unwrap()?;
|
||||||
|
Ok(stream)
|
||||||
|
}
|
||||||
|
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
mod tests {
|
mod tests {
|
||||||
use super::*;
|
use super::*;
|
||||||
|
Loading…
Reference in New Issue
Block a user