mirror of
https://github.com/kata-containers/kata-containers.git
synced 2025-04-28 11:44:38 +00:00
dragonball: vsock add fifo/pipe stream support for passed fd hybridStream
Since the passed fd through unix socket would be any stream fd such as pipe/fifo fd or any other socket fd, thus we should deal with it as a normal hybrid stream instead of a unix stream. Fixes:#7584 Signed-off-by: Fupan Li <fupan.lfp@antgroup.com>
This commit is contained in:
parent
b098960442
commit
39e67b06e9
@ -0,0 +1,94 @@
|
|||||||
|
// Copyright 2023 Ant Group. All Rights Reserved.
|
||||||
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
|
use std::any::Any;
|
||||||
|
use std::io::{Error, Read, Write};
|
||||||
|
use std::os::unix::io::{AsRawFd, RawFd};
|
||||||
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use log::error;
|
||||||
|
use nix::errno::Errno;
|
||||||
|
|
||||||
|
use super::{VsockBackendType, VsockStream};
|
||||||
|
|
||||||
|
pub struct HybridStream {
|
||||||
|
pub hybrid_stream: std::fs::File,
|
||||||
|
pub slave_stream: Option<Box<dyn VsockStream>>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl AsRawFd for HybridStream {
|
||||||
|
fn as_raw_fd(&self) -> RawFd {
|
||||||
|
self.hybrid_stream.as_raw_fd()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Read for HybridStream {
|
||||||
|
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
||||||
|
self.hybrid_stream.read(buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl Write for HybridStream {
|
||||||
|
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
||||||
|
// The slave stream was only used to reply the connect result "ok <port>",
|
||||||
|
// thus it was only used once here, and the data would be replied by the
|
||||||
|
// main stream.
|
||||||
|
if let Some(mut stream) = self.slave_stream.take() {
|
||||||
|
stream.write(buf)
|
||||||
|
} else {
|
||||||
|
self.hybrid_stream.write(buf)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fn flush(&mut self) -> std::io::Result<()> {
|
||||||
|
self.hybrid_stream.flush()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl VsockStream for HybridStream {
|
||||||
|
fn backend_type(&self) -> VsockBackendType {
|
||||||
|
VsockBackendType::HybridStream
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_nonblocking(&mut self, nonblocking: bool) -> std::io::Result<()> {
|
||||||
|
let fd = self.hybrid_stream.as_raw_fd();
|
||||||
|
let mut flag = unsafe { libc::fcntl(fd, libc::F_GETFL) };
|
||||||
|
|
||||||
|
if nonblocking {
|
||||||
|
flag = flag | libc::O_NONBLOCK;
|
||||||
|
} else {
|
||||||
|
flag = flag & !libc::O_NONBLOCK;
|
||||||
|
}
|
||||||
|
|
||||||
|
let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flag) };
|
||||||
|
|
||||||
|
if ret < 0 {
|
||||||
|
error!("failed to set fcntl for fd {} with ret {}", fd, ret);
|
||||||
|
return Err(Error::last_os_error());
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_read_timeout(&mut self, _dur: Option<Duration>) -> std::io::Result<()> {
|
||||||
|
error!("unsupported!");
|
||||||
|
Err(Errno::ENOPROTOOPT.into())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn set_write_timeout(&mut self, _dur: Option<Duration>) -> std::io::Result<()> {
|
||||||
|
error!("unsupported!");
|
||||||
|
Err(Errno::ENOPROTOOPT.into())
|
||||||
|
}
|
||||||
|
|
||||||
|
fn as_any(&self) -> &dyn Any {
|
||||||
|
self
|
||||||
|
}
|
||||||
|
|
||||||
|
fn recv_data_fd(
|
||||||
|
&self,
|
||||||
|
_bytes: &mut [u8],
|
||||||
|
_fds: &mut [RawFd],
|
||||||
|
) -> std::io::Result<(usize, usize)> {
|
||||||
|
Err(Errno::ENOPROTOOPT.into())
|
||||||
|
}
|
||||||
|
}
|
@ -9,13 +9,14 @@ use std::io::{Read, Write};
|
|||||||
use std::os::unix::io::{AsRawFd, RawFd};
|
use std::os::unix::io::{AsRawFd, RawFd};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
mod hybrid_stream;
|
||||||
mod inner;
|
mod inner;
|
||||||
mod tcp;
|
mod tcp;
|
||||||
mod unix_stream;
|
mod unix_stream;
|
||||||
|
|
||||||
|
pub use self::hybrid_stream::HybridStream;
|
||||||
pub use self::inner::{VsockInnerBackend, VsockInnerConnector, VsockInnerStream};
|
pub use self::inner::{VsockInnerBackend, VsockInnerConnector, VsockInnerStream};
|
||||||
pub use self::tcp::VsockTcpBackend;
|
pub use self::tcp::VsockTcpBackend;
|
||||||
pub use self::unix_stream::HybridUnixStreamBackend;
|
|
||||||
pub use self::unix_stream::VsockUnixStreamBackend;
|
pub use self::unix_stream::VsockUnixStreamBackend;
|
||||||
|
|
||||||
/// The type of vsock backend.
|
/// The type of vsock backend.
|
||||||
@ -27,6 +28,8 @@ pub enum VsockBackendType {
|
|||||||
Tcp,
|
Tcp,
|
||||||
/// Inner backend
|
/// Inner backend
|
||||||
Inner,
|
Inner,
|
||||||
|
/// Fd passed hybrid stream backend
|
||||||
|
HybridStream,
|
||||||
/// For test purpose
|
/// For test purpose
|
||||||
#[cfg(test)]
|
#[cfg(test)]
|
||||||
Test,
|
Test,
|
||||||
|
@ -2,7 +2,6 @@
|
|||||||
// SPDX-License-Identifier: Apache-2.0
|
// SPDX-License-Identifier: Apache-2.0
|
||||||
|
|
||||||
use std::any::Any;
|
use std::any::Any;
|
||||||
use std::io::{Read, Write};
|
|
||||||
use std::os::unix::io::{AsRawFd, RawFd};
|
use std::os::unix::io::{AsRawFd, RawFd};
|
||||||
use std::os::unix::net::{UnixListener, UnixStream};
|
use std::os::unix::net::{UnixListener, UnixStream};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
@ -13,66 +12,6 @@ use sendfd::RecvWithFd;
|
|||||||
use super::super::{Result, VsockError};
|
use super::super::{Result, VsockError};
|
||||||
use super::{VsockBackend, VsockBackendType, VsockStream};
|
use super::{VsockBackend, VsockBackendType, VsockStream};
|
||||||
|
|
||||||
pub struct HybridUnixStreamBackend {
|
|
||||||
pub unix_stream: Box<dyn VsockStream>,
|
|
||||||
pub slave_stream: Option<Box<dyn VsockStream>>,
|
|
||||||
}
|
|
||||||
|
|
||||||
impl VsockStream for HybridUnixStreamBackend {
|
|
||||||
fn backend_type(&self) -> VsockBackendType {
|
|
||||||
self.unix_stream.backend_type()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn set_nonblocking(&mut self, nonblocking: bool) -> std::io::Result<()> {
|
|
||||||
self.unix_stream.set_nonblocking(nonblocking)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn set_read_timeout(&mut self, dur: Option<Duration>) -> std::io::Result<()> {
|
|
||||||
self.unix_stream.set_read_timeout(dur)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn set_write_timeout(&mut self, dur: Option<Duration>) -> std::io::Result<()> {
|
|
||||||
self.unix_stream.set_write_timeout(dur)
|
|
||||||
}
|
|
||||||
|
|
||||||
fn as_any(&self) -> &dyn Any {
|
|
||||||
self.unix_stream.as_any()
|
|
||||||
}
|
|
||||||
|
|
||||||
fn recv_data_fd(&self, bytes: &mut [u8], fds: &mut [RawFd]) -> std::io::Result<(usize, usize)> {
|
|
||||||
self.unix_stream.recv_data_fd(bytes, fds)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl AsRawFd for HybridUnixStreamBackend {
|
|
||||||
fn as_raw_fd(&self) -> RawFd {
|
|
||||||
self.unix_stream.as_raw_fd()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Read for HybridUnixStreamBackend {
|
|
||||||
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
|
|
||||||
self.unix_stream.read(buf)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl Write for HybridUnixStreamBackend {
|
|
||||||
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
|
|
||||||
// The slave stream was only used to reply the connect result "ok <port>",
|
|
||||||
// thus it was only used once here, and the data would be replied by the
|
|
||||||
// main stream.
|
|
||||||
if let Some(mut stream) = self.slave_stream.take() {
|
|
||||||
stream.write(buf)
|
|
||||||
} else {
|
|
||||||
self.unix_stream.write(buf)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fn flush(&mut self) -> std::io::Result<()> {
|
|
||||||
self.unix_stream.flush()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
impl VsockStream for UnixStream {
|
impl VsockStream for UnixStream {
|
||||||
fn backend_type(&self) -> VsockBackendType {
|
fn backend_type(&self) -> VsockBackendType {
|
||||||
VsockBackendType::UnixStream
|
VsockBackendType::UnixStream
|
||||||
|
@ -60,6 +60,10 @@ pub enum Error {
|
|||||||
#[error("error connecting to a backend: {0}")]
|
#[error("error connecting to a backend: {0}")]
|
||||||
BackendConnect(#[source] std::io::Error),
|
BackendConnect(#[source] std::io::Error),
|
||||||
|
|
||||||
|
/// Error set nonblock to a backend stream.
|
||||||
|
#[error("error set nonblocking to a backend: {0}")]
|
||||||
|
BackendSetNonBlock(#[source] std::io::Error),
|
||||||
|
|
||||||
/// Error reading from backend.
|
/// Error reading from backend.
|
||||||
#[error("error reading from backend: {0}")]
|
#[error("error reading from backend: {0}")]
|
||||||
BackendRead(#[source] std::io::Error),
|
BackendRead(#[source] std::io::Error),
|
||||||
|
@ -36,14 +36,14 @@
|
|||||||
/// route all these events to their handlers, the muxer uses another
|
/// route all these events to their handlers, the muxer uses another
|
||||||
/// `HashMap` object, mapping `RawFd`s to `EpollListener`s.
|
/// `HashMap` object, mapping `RawFd`s to `EpollListener`s.
|
||||||
use std::collections::{HashMap, HashSet};
|
use std::collections::{HashMap, HashSet};
|
||||||
|
use std::fs::File;
|
||||||
use std::io::Read;
|
use std::io::Read;
|
||||||
use std::os::fd::FromRawFd;
|
use std::os::fd::FromRawFd;
|
||||||
use std::os::unix::io::{AsRawFd, RawFd};
|
use std::os::unix::io::{AsRawFd, RawFd};
|
||||||
use std::os::unix::net::UnixStream;
|
|
||||||
|
|
||||||
use log::{debug, error, info, trace, warn};
|
use log::{debug, error, info, trace, warn};
|
||||||
|
|
||||||
use super::super::backend::{HybridUnixStreamBackend, VsockBackend, VsockBackendType, VsockStream};
|
use super::super::backend::{HybridStream, VsockBackend, VsockBackendType, VsockStream};
|
||||||
|
|
||||||
use super::super::csm::{ConnState, VsockConnection};
|
use super::super::csm::{ConnState, VsockConnection};
|
||||||
use super::super::defs::uapi;
|
use super::super::defs::uapi;
|
||||||
@ -480,13 +480,17 @@ impl VsockMuxer {
|
|||||||
.and_then(|(nfd, local_port, peer_port)| {
|
.and_then(|(nfd, local_port, peer_port)| {
|
||||||
// Here we should make sure the nfd the sole owner to convert it
|
// Here we should make sure the nfd the sole owner to convert it
|
||||||
// into an UnixStream object, otherwise, it could cause memory unsafety.
|
// into an UnixStream object, otherwise, it could cause memory unsafety.
|
||||||
let nstream = unsafe { UnixStream::from_raw_fd(nfd) };
|
let nstream = unsafe { File::from_raw_fd(nfd) };
|
||||||
|
|
||||||
let hybridstream = HybridUnixStreamBackend {
|
let mut hybridstream = HybridStream {
|
||||||
unix_stream: Box::new(nstream),
|
hybrid_stream: nstream,
|
||||||
slave_stream: Some(stream),
|
slave_stream: Some(stream),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
hybridstream
|
||||||
|
.set_nonblocking(true)
|
||||||
|
.map_err(Error::BackendSetNonBlock)?;
|
||||||
|
|
||||||
self.add_connection(
|
self.add_connection(
|
||||||
ConnMapKey {
|
ConnMapKey {
|
||||||
local_port,
|
local_port,
|
||||||
|
Loading…
Reference in New Issue
Block a user