mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-04-28 03:42:09 +00:00
dragonball: vsock add fifo/pipe stream support for passed fd hybridStream
Since the passed fd through unix socket would be any stream fd such as pipe/fifo fd or any other socket fd, thus we should deal with it as a normal hybrid stream instead of a unix stream. Fixes:#7584 Signed-off-by: Fupan Li <fupan.lfp@antgroup.com>
This commit is contained in:
parent
b098960442
commit
39e67b06e9
@ -0,0 +1,94 @@
|
||||
// Copyright 2023 Ant Group. All Rights Reserved.
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use std::any::Any;
|
||||
use std::io::{Error, Read, Write};
|
||||
use std::os::unix::io::{AsRawFd, RawFd};
|
||||
use std::time::Duration;
|
||||
|
||||
use log::error;
|
||||
use nix::errno::Errno;
|
||||
|
||||
use super::{VsockBackendType, VsockStream};
|
||||
|
||||
pub struct HybridStream {
|
||||
pub hybrid_stream: std::fs::File,
|
||||
pub slave_stream: Option<Box<dyn VsockStream>>,
|
||||
}
|
||||
|
||||
impl AsRawFd for HybridStream {
|
||||
fn as_raw_fd(&self) -> RawFd {
|
||||
self.hybrid_stream.as_raw_fd()
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for HybridStream {
|
||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||
self.hybrid_stream.read(buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl Write for HybridStream {
|
||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
// The slave stream was only used to reply the connect result "ok <port>",
|
||||
// thus it was only used once here, and the data would be replied by the
|
||||
// main stream.
|
||||
if let Some(mut stream) = self.slave_stream.take() {
|
||||
stream.write(buf)
|
||||
} else {
|
||||
self.hybrid_stream.write(buf)
|
||||
}
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> std::io::Result<()> {
|
||||
self.hybrid_stream.flush()
|
||||
}
|
||||
}
|
||||
|
||||
impl VsockStream for HybridStream {
|
||||
fn backend_type(&self) -> VsockBackendType {
|
||||
VsockBackendType::HybridStream
|
||||
}
|
||||
|
||||
fn set_nonblocking(&mut self, nonblocking: bool) -> std::io::Result<()> {
|
||||
let fd = self.hybrid_stream.as_raw_fd();
|
||||
let mut flag = unsafe { libc::fcntl(fd, libc::F_GETFL) };
|
||||
|
||||
if nonblocking {
|
||||
flag = flag | libc::O_NONBLOCK;
|
||||
} else {
|
||||
flag = flag & !libc::O_NONBLOCK;
|
||||
}
|
||||
|
||||
let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flag) };
|
||||
|
||||
if ret < 0 {
|
||||
error!("failed to set fcntl for fd {} with ret {}", fd, ret);
|
||||
return Err(Error::last_os_error());
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn set_read_timeout(&mut self, _dur: Option<Duration>) -> std::io::Result<()> {
|
||||
error!("unsupported!");
|
||||
Err(Errno::ENOPROTOOPT.into())
|
||||
}
|
||||
|
||||
fn set_write_timeout(&mut self, _dur: Option<Duration>) -> std::io::Result<()> {
|
||||
error!("unsupported!");
|
||||
Err(Errno::ENOPROTOOPT.into())
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self
|
||||
}
|
||||
|
||||
fn recv_data_fd(
|
||||
&self,
|
||||
_bytes: &mut [u8],
|
||||
_fds: &mut [RawFd],
|
||||
) -> std::io::Result<(usize, usize)> {
|
||||
Err(Errno::ENOPROTOOPT.into())
|
||||
}
|
||||
}
|
@ -9,13 +9,14 @@ use std::io::{Read, Write};
|
||||
use std::os::unix::io::{AsRawFd, RawFd};
|
||||
use std::time::Duration;
|
||||
|
||||
mod hybrid_stream;
|
||||
mod inner;
|
||||
mod tcp;
|
||||
mod unix_stream;
|
||||
|
||||
pub use self::hybrid_stream::HybridStream;
|
||||
pub use self::inner::{VsockInnerBackend, VsockInnerConnector, VsockInnerStream};
|
||||
pub use self::tcp::VsockTcpBackend;
|
||||
pub use self::unix_stream::HybridUnixStreamBackend;
|
||||
pub use self::unix_stream::VsockUnixStreamBackend;
|
||||
|
||||
/// The type of vsock backend.
|
||||
@ -27,6 +28,8 @@ pub enum VsockBackendType {
|
||||
Tcp,
|
||||
/// Inner backend
|
||||
Inner,
|
||||
/// Fd passed hybrid stream backend
|
||||
HybridStream,
|
||||
/// For test purpose
|
||||
#[cfg(test)]
|
||||
Test,
|
||||
|
@ -2,7 +2,6 @@
|
||||
// SPDX-License-Identifier: Apache-2.0
|
||||
|
||||
use std::any::Any;
|
||||
use std::io::{Read, Write};
|
||||
use std::os::unix::io::{AsRawFd, RawFd};
|
||||
use std::os::unix::net::{UnixListener, UnixStream};
|
||||
use std::time::Duration;
|
||||
@ -13,66 +12,6 @@ use sendfd::RecvWithFd;
|
||||
use super::super::{Result, VsockError};
|
||||
use super::{VsockBackend, VsockBackendType, VsockStream};
|
||||
|
||||
pub struct HybridUnixStreamBackend {
|
||||
pub unix_stream: Box<dyn VsockStream>,
|
||||
pub slave_stream: Option<Box<dyn VsockStream>>,
|
||||
}
|
||||
|
||||
impl VsockStream for HybridUnixStreamBackend {
|
||||
fn backend_type(&self) -> VsockBackendType {
|
||||
self.unix_stream.backend_type()
|
||||
}
|
||||
|
||||
fn set_nonblocking(&mut self, nonblocking: bool) -> std::io::Result<()> {
|
||||
self.unix_stream.set_nonblocking(nonblocking)
|
||||
}
|
||||
|
||||
fn set_read_timeout(&mut self, dur: Option<Duration>) -> std::io::Result<()> {
|
||||
self.unix_stream.set_read_timeout(dur)
|
||||
}
|
||||
|
||||
fn set_write_timeout(&mut self, dur: Option<Duration>) -> std::io::Result<()> {
|
||||
self.unix_stream.set_write_timeout(dur)
|
||||
}
|
||||
|
||||
fn as_any(&self) -> &dyn Any {
|
||||
self.unix_stream.as_any()
|
||||
}
|
||||
|
||||
fn recv_data_fd(&self, bytes: &mut [u8], fds: &mut [RawFd]) -> std::io::Result<(usize, usize)> {
|
||||
self.unix_stream.recv_data_fd(bytes, fds)
|
||||
}
|
||||
}
|
||||
|
||||
impl AsRawFd for HybridUnixStreamBackend {
|
||||
fn as_raw_fd(&self) -> RawFd {
|
||||
self.unix_stream.as_raw_fd()
|
||||
}
|
||||
}
|
||||
|
||||
impl Read for HybridUnixStreamBackend {
|
||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||
self.unix_stream.read(buf)
|
||||
}
|
||||
}
|
||||
|
||||
impl Write for HybridUnixStreamBackend {
|
||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||
// The slave stream was only used to reply the connect result "ok <port>",
|
||||
// thus it was only used once here, and the data would be replied by the
|
||||
// main stream.
|
||||
if let Some(mut stream) = self.slave_stream.take() {
|
||||
stream.write(buf)
|
||||
} else {
|
||||
self.unix_stream.write(buf)
|
||||
}
|
||||
}
|
||||
|
||||
fn flush(&mut self) -> std::io::Result<()> {
|
||||
self.unix_stream.flush()
|
||||
}
|
||||
}
|
||||
|
||||
impl VsockStream for UnixStream {
|
||||
fn backend_type(&self) -> VsockBackendType {
|
||||
VsockBackendType::UnixStream
|
||||
|
@ -60,6 +60,10 @@ pub enum Error {
|
||||
#[error("error connecting to a backend: {0}")]
|
||||
BackendConnect(#[source] std::io::Error),
|
||||
|
||||
/// Error set nonblock to a backend stream.
|
||||
#[error("error set nonblocking to a backend: {0}")]
|
||||
BackendSetNonBlock(#[source] std::io::Error),
|
||||
|
||||
/// Error reading from backend.
|
||||
#[error("error reading from backend: {0}")]
|
||||
BackendRead(#[source] std::io::Error),
|
||||
|
@ -36,14 +36,14 @@
|
||||
/// route all these events to their handlers, the muxer uses another
|
||||
/// `HashMap` object, mapping `RawFd`s to `EpollListener`s.
|
||||
use std::collections::{HashMap, HashSet};
|
||||
use std::fs::File;
|
||||
use std::io::Read;
|
||||
use std::os::fd::FromRawFd;
|
||||
use std::os::unix::io::{AsRawFd, RawFd};
|
||||
use std::os::unix::net::UnixStream;
|
||||
|
||||
use log::{debug, error, info, trace, warn};
|
||||
|
||||
use super::super::backend::{HybridUnixStreamBackend, VsockBackend, VsockBackendType, VsockStream};
|
||||
use super::super::backend::{HybridStream, VsockBackend, VsockBackendType, VsockStream};
|
||||
|
||||
use super::super::csm::{ConnState, VsockConnection};
|
||||
use super::super::defs::uapi;
|
||||
@ -480,13 +480,17 @@ impl VsockMuxer {
|
||||
.and_then(|(nfd, local_port, peer_port)| {
|
||||
// Here we should make sure the nfd the sole owner to convert it
|
||||
// into an UnixStream object, otherwise, it could cause memory unsafety.
|
||||
let nstream = unsafe { UnixStream::from_raw_fd(nfd) };
|
||||
let nstream = unsafe { File::from_raw_fd(nfd) };
|
||||
|
||||
let hybridstream = HybridUnixStreamBackend {
|
||||
unix_stream: Box::new(nstream),
|
||||
let mut hybridstream = HybridStream {
|
||||
hybrid_stream: nstream,
|
||||
slave_stream: Some(stream),
|
||||
};
|
||||
|
||||
hybridstream
|
||||
.set_nonblocking(true)
|
||||
.map_err(Error::BackendSetNonBlock)?;
|
||||
|
||||
self.add_connection(
|
||||
ConnMapKey {
|
||||
local_port,
|
||||
|
Loading…
Reference in New Issue
Block a user