dragonball: vsock add fifo/pipe stream support for passed fd hybridStream

Since the passed fd through unix socket would be any
stream fd such as pipe/fifo fd or any other socket
fd, thus we should deal with it as a normal hybrid
stream instead of a unix stream.

Fixes:#7584

Signed-off-by: Fupan Li <fupan.lfp@antgroup.com>
This commit is contained in:
Fupan Li 2023-07-21 16:09:44 +08:00
parent b098960442
commit 39e67b06e9
5 changed files with 111 additions and 67 deletions

View File

@ -0,0 +1,94 @@
// Copyright 2023 Ant Group. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
use std::any::Any;
use std::io::{Error, Read, Write};
use std::os::unix::io::{AsRawFd, RawFd};
use std::time::Duration;
use log::error;
use nix::errno::Errno;
use super::{VsockBackendType, VsockStream};
pub struct HybridStream {
pub hybrid_stream: std::fs::File,
pub slave_stream: Option<Box<dyn VsockStream>>,
}
impl AsRawFd for HybridStream {
fn as_raw_fd(&self) -> RawFd {
self.hybrid_stream.as_raw_fd()
}
}
impl Read for HybridStream {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.hybrid_stream.read(buf)
}
}
impl Write for HybridStream {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
// The slave stream was only used to reply the connect result "ok <port>",
// thus it was only used once here, and the data would be replied by the
// main stream.
if let Some(mut stream) = self.slave_stream.take() {
stream.write(buf)
} else {
self.hybrid_stream.write(buf)
}
}
fn flush(&mut self) -> std::io::Result<()> {
self.hybrid_stream.flush()
}
}
impl VsockStream for HybridStream {
fn backend_type(&self) -> VsockBackendType {
VsockBackendType::HybridStream
}
fn set_nonblocking(&mut self, nonblocking: bool) -> std::io::Result<()> {
let fd = self.hybrid_stream.as_raw_fd();
let mut flag = unsafe { libc::fcntl(fd, libc::F_GETFL) };
if nonblocking {
flag = flag | libc::O_NONBLOCK;
} else {
flag = flag & !libc::O_NONBLOCK;
}
let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flag) };
if ret < 0 {
error!("failed to set fcntl for fd {} with ret {}", fd, ret);
return Err(Error::last_os_error());
}
Ok(())
}
fn set_read_timeout(&mut self, _dur: Option<Duration>) -> std::io::Result<()> {
error!("unsupported!");
Err(Errno::ENOPROTOOPT.into())
}
fn set_write_timeout(&mut self, _dur: Option<Duration>) -> std::io::Result<()> {
error!("unsupported!");
Err(Errno::ENOPROTOOPT.into())
}
fn as_any(&self) -> &dyn Any {
self
}
fn recv_data_fd(
&self,
_bytes: &mut [u8],
_fds: &mut [RawFd],
) -> std::io::Result<(usize, usize)> {
Err(Errno::ENOPROTOOPT.into())
}
}

View File

@ -9,13 +9,14 @@ use std::io::{Read, Write};
use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::io::{AsRawFd, RawFd};
use std::time::Duration; use std::time::Duration;
mod hybrid_stream;
mod inner; mod inner;
mod tcp; mod tcp;
mod unix_stream; mod unix_stream;
pub use self::hybrid_stream::HybridStream;
pub use self::inner::{VsockInnerBackend, VsockInnerConnector, VsockInnerStream}; pub use self::inner::{VsockInnerBackend, VsockInnerConnector, VsockInnerStream};
pub use self::tcp::VsockTcpBackend; pub use self::tcp::VsockTcpBackend;
pub use self::unix_stream::HybridUnixStreamBackend;
pub use self::unix_stream::VsockUnixStreamBackend; pub use self::unix_stream::VsockUnixStreamBackend;
/// The type of vsock backend. /// The type of vsock backend.
@ -27,6 +28,8 @@ pub enum VsockBackendType {
Tcp, Tcp,
/// Inner backend /// Inner backend
Inner, Inner,
/// Fd passed hybrid stream backend
HybridStream,
/// For test purpose /// For test purpose
#[cfg(test)] #[cfg(test)]
Test, Test,

View File

@ -2,7 +2,6 @@
// SPDX-License-Identifier: Apache-2.0 // SPDX-License-Identifier: Apache-2.0
use std::any::Any; use std::any::Any;
use std::io::{Read, Write};
use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net::{UnixListener, UnixStream}; use std::os::unix::net::{UnixListener, UnixStream};
use std::time::Duration; use std::time::Duration;
@ -13,66 +12,6 @@ use sendfd::RecvWithFd;
use super::super::{Result, VsockError}; use super::super::{Result, VsockError};
use super::{VsockBackend, VsockBackendType, VsockStream}; use super::{VsockBackend, VsockBackendType, VsockStream};
pub struct HybridUnixStreamBackend {
pub unix_stream: Box<dyn VsockStream>,
pub slave_stream: Option<Box<dyn VsockStream>>,
}
impl VsockStream for HybridUnixStreamBackend {
fn backend_type(&self) -> VsockBackendType {
self.unix_stream.backend_type()
}
fn set_nonblocking(&mut self, nonblocking: bool) -> std::io::Result<()> {
self.unix_stream.set_nonblocking(nonblocking)
}
fn set_read_timeout(&mut self, dur: Option<Duration>) -> std::io::Result<()> {
self.unix_stream.set_read_timeout(dur)
}
fn set_write_timeout(&mut self, dur: Option<Duration>) -> std::io::Result<()> {
self.unix_stream.set_write_timeout(dur)
}
fn as_any(&self) -> &dyn Any {
self.unix_stream.as_any()
}
fn recv_data_fd(&self, bytes: &mut [u8], fds: &mut [RawFd]) -> std::io::Result<(usize, usize)> {
self.unix_stream.recv_data_fd(bytes, fds)
}
}
impl AsRawFd for HybridUnixStreamBackend {
fn as_raw_fd(&self) -> RawFd {
self.unix_stream.as_raw_fd()
}
}
impl Read for HybridUnixStreamBackend {
fn read(&mut self, buf: &mut [u8]) -> std::io::Result<usize> {
self.unix_stream.read(buf)
}
}
impl Write for HybridUnixStreamBackend {
fn write(&mut self, buf: &[u8]) -> std::io::Result<usize> {
// The slave stream was only used to reply the connect result "ok <port>",
// thus it was only used once here, and the data would be replied by the
// main stream.
if let Some(mut stream) = self.slave_stream.take() {
stream.write(buf)
} else {
self.unix_stream.write(buf)
}
}
fn flush(&mut self) -> std::io::Result<()> {
self.unix_stream.flush()
}
}
impl VsockStream for UnixStream { impl VsockStream for UnixStream {
fn backend_type(&self) -> VsockBackendType { fn backend_type(&self) -> VsockBackendType {
VsockBackendType::UnixStream VsockBackendType::UnixStream

View File

@ -60,6 +60,10 @@ pub enum Error {
#[error("error connecting to a backend: {0}")] #[error("error connecting to a backend: {0}")]
BackendConnect(#[source] std::io::Error), BackendConnect(#[source] std::io::Error),
/// Error set nonblock to a backend stream.
#[error("error set nonblocking to a backend: {0}")]
BackendSetNonBlock(#[source] std::io::Error),
/// Error reading from backend. /// Error reading from backend.
#[error("error reading from backend: {0}")] #[error("error reading from backend: {0}")]
BackendRead(#[source] std::io::Error), BackendRead(#[source] std::io::Error),

View File

@ -36,14 +36,14 @@
/// route all these events to their handlers, the muxer uses another /// route all these events to their handlers, the muxer uses another
/// `HashMap` object, mapping `RawFd`s to `EpollListener`s. /// `HashMap` object, mapping `RawFd`s to `EpollListener`s.
use std::collections::{HashMap, HashSet}; use std::collections::{HashMap, HashSet};
use std::fs::File;
use std::io::Read; use std::io::Read;
use std::os::fd::FromRawFd; use std::os::fd::FromRawFd;
use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::io::{AsRawFd, RawFd};
use std::os::unix::net::UnixStream;
use log::{debug, error, info, trace, warn}; use log::{debug, error, info, trace, warn};
use super::super::backend::{HybridUnixStreamBackend, VsockBackend, VsockBackendType, VsockStream}; use super::super::backend::{HybridStream, VsockBackend, VsockBackendType, VsockStream};
use super::super::csm::{ConnState, VsockConnection}; use super::super::csm::{ConnState, VsockConnection};
use super::super::defs::uapi; use super::super::defs::uapi;
@ -480,13 +480,17 @@ impl VsockMuxer {
.and_then(|(nfd, local_port, peer_port)| { .and_then(|(nfd, local_port, peer_port)| {
// Here we should make sure the nfd the sole owner to convert it // Here we should make sure the nfd the sole owner to convert it
// into an UnixStream object, otherwise, it could cause memory unsafety. // into an UnixStream object, otherwise, it could cause memory unsafety.
let nstream = unsafe { UnixStream::from_raw_fd(nfd) }; let nstream = unsafe { File::from_raw_fd(nfd) };
let hybridstream = HybridUnixStreamBackend { let mut hybridstream = HybridStream {
unix_stream: Box::new(nstream), hybrid_stream: nstream,
slave_stream: Some(stream), slave_stream: Some(stream),
}; };
hybridstream
.set_nonblocking(true)
.map_err(Error::BackendSetNonBlock)?;
self.add_connection( self.add_connection(
ConnMapKey { ConnMapKey {
local_port, local_port,