diff --git a/src/dragonball/src/dbs_virtio_devices/src/vsock/backend/hybrid_stream.rs b/src/dragonball/src/dbs_virtio_devices/src/vsock/backend/hybrid_stream.rs new file mode 100644 index 0000000000..f566e0d694 --- /dev/null +++ b/src/dragonball/src/dbs_virtio_devices/src/vsock/backend/hybrid_stream.rs @@ -0,0 +1,94 @@ +// Copyright 2023 Ant Group. All Rights Reserved. +// SPDX-License-Identifier: Apache-2.0 + +use std::any::Any; +use std::io::{Error, Read, Write}; +use std::os::unix::io::{AsRawFd, RawFd}; +use std::time::Duration; + +use log::error; +use nix::errno::Errno; + +use super::{VsockBackendType, VsockStream}; + +pub struct HybridStream { + pub hybrid_stream: std::fs::File, + pub slave_stream: Option>, +} + +impl AsRawFd for HybridStream { + fn as_raw_fd(&self) -> RawFd { + self.hybrid_stream.as_raw_fd() + } +} + +impl Read for HybridStream { + fn read(&mut self, buf: &mut [u8]) -> std::io::Result { + self.hybrid_stream.read(buf) + } +} + +impl Write for HybridStream { + fn write(&mut self, buf: &[u8]) -> std::io::Result { + // The slave stream was only used to reply the connect result "ok ", + // thus it was only used once here, and the data would be replied by the + // main stream. + if let Some(mut stream) = self.slave_stream.take() { + stream.write(buf) + } else { + self.hybrid_stream.write(buf) + } + } + + fn flush(&mut self) -> std::io::Result<()> { + self.hybrid_stream.flush() + } +} + +impl VsockStream for HybridStream { + fn backend_type(&self) -> VsockBackendType { + VsockBackendType::HybridStream + } + + fn set_nonblocking(&mut self, nonblocking: bool) -> std::io::Result<()> { + let fd = self.hybrid_stream.as_raw_fd(); + let mut flag = unsafe { libc::fcntl(fd, libc::F_GETFL) }; + + if nonblocking { + flag = flag | libc::O_NONBLOCK; + } else { + flag = flag & !libc::O_NONBLOCK; + } + + let ret = unsafe { libc::fcntl(fd, libc::F_SETFL, flag) }; + + if ret < 0 { + error!("failed to set fcntl for fd {} with ret {}", fd, ret); + return Err(Error::last_os_error()); + } + + Ok(()) + } + + fn set_read_timeout(&mut self, _dur: Option) -> std::io::Result<()> { + error!("unsupported!"); + Err(Errno::ENOPROTOOPT.into()) + } + + fn set_write_timeout(&mut self, _dur: Option) -> std::io::Result<()> { + error!("unsupported!"); + Err(Errno::ENOPROTOOPT.into()) + } + + fn as_any(&self) -> &dyn Any { + self + } + + fn recv_data_fd( + &self, + _bytes: &mut [u8], + _fds: &mut [RawFd], + ) -> std::io::Result<(usize, usize)> { + Err(Errno::ENOPROTOOPT.into()) + } +} diff --git a/src/dragonball/src/dbs_virtio_devices/src/vsock/backend/mod.rs b/src/dragonball/src/dbs_virtio_devices/src/vsock/backend/mod.rs index 4f555c77d0..bc916b33c5 100644 --- a/src/dragonball/src/dbs_virtio_devices/src/vsock/backend/mod.rs +++ b/src/dragonball/src/dbs_virtio_devices/src/vsock/backend/mod.rs @@ -9,13 +9,14 @@ use std::io::{Read, Write}; use std::os::unix::io::{AsRawFd, RawFd}; use std::time::Duration; +mod hybrid_stream; mod inner; mod tcp; mod unix_stream; +pub use self::hybrid_stream::HybridStream; pub use self::inner::{VsockInnerBackend, VsockInnerConnector, VsockInnerStream}; pub use self::tcp::VsockTcpBackend; -pub use self::unix_stream::HybridUnixStreamBackend; pub use self::unix_stream::VsockUnixStreamBackend; /// The type of vsock backend. @@ -27,6 +28,8 @@ pub enum VsockBackendType { Tcp, /// Inner backend Inner, + /// Fd passed hybrid stream backend + HybridStream, /// For test purpose #[cfg(test)] Test, diff --git a/src/dragonball/src/dbs_virtio_devices/src/vsock/backend/unix_stream.rs b/src/dragonball/src/dbs_virtio_devices/src/vsock/backend/unix_stream.rs index 7b268b4f41..8f03a836df 100644 --- a/src/dragonball/src/dbs_virtio_devices/src/vsock/backend/unix_stream.rs +++ b/src/dragonball/src/dbs_virtio_devices/src/vsock/backend/unix_stream.rs @@ -2,7 +2,6 @@ // SPDX-License-Identifier: Apache-2.0 use std::any::Any; -use std::io::{Read, Write}; use std::os::unix::io::{AsRawFd, RawFd}; use std::os::unix::net::{UnixListener, UnixStream}; use std::time::Duration; @@ -13,66 +12,6 @@ use sendfd::RecvWithFd; use super::super::{Result, VsockError}; use super::{VsockBackend, VsockBackendType, VsockStream}; -pub struct HybridUnixStreamBackend { - pub unix_stream: Box, - pub slave_stream: Option>, -} - -impl VsockStream for HybridUnixStreamBackend { - fn backend_type(&self) -> VsockBackendType { - self.unix_stream.backend_type() - } - - fn set_nonblocking(&mut self, nonblocking: bool) -> std::io::Result<()> { - self.unix_stream.set_nonblocking(nonblocking) - } - - fn set_read_timeout(&mut self, dur: Option) -> std::io::Result<()> { - self.unix_stream.set_read_timeout(dur) - } - - fn set_write_timeout(&mut self, dur: Option) -> std::io::Result<()> { - self.unix_stream.set_write_timeout(dur) - } - - fn as_any(&self) -> &dyn Any { - self.unix_stream.as_any() - } - - fn recv_data_fd(&self, bytes: &mut [u8], fds: &mut [RawFd]) -> std::io::Result<(usize, usize)> { - self.unix_stream.recv_data_fd(bytes, fds) - } -} - -impl AsRawFd for HybridUnixStreamBackend { - fn as_raw_fd(&self) -> RawFd { - self.unix_stream.as_raw_fd() - } -} - -impl Read for HybridUnixStreamBackend { - fn read(&mut self, buf: &mut [u8]) -> std::io::Result { - self.unix_stream.read(buf) - } -} - -impl Write for HybridUnixStreamBackend { - fn write(&mut self, buf: &[u8]) -> std::io::Result { - // The slave stream was only used to reply the connect result "ok ", - // thus it was only used once here, and the data would be replied by the - // main stream. - if let Some(mut stream) = self.slave_stream.take() { - stream.write(buf) - } else { - self.unix_stream.write(buf) - } - } - - fn flush(&mut self) -> std::io::Result<()> { - self.unix_stream.flush() - } -} - impl VsockStream for UnixStream { fn backend_type(&self) -> VsockBackendType { VsockBackendType::UnixStream diff --git a/src/dragonball/src/dbs_virtio_devices/src/vsock/muxer/mod.rs b/src/dragonball/src/dbs_virtio_devices/src/vsock/muxer/mod.rs index 2e71adefa9..b6b7db1cb4 100644 --- a/src/dragonball/src/dbs_virtio_devices/src/vsock/muxer/mod.rs +++ b/src/dragonball/src/dbs_virtio_devices/src/vsock/muxer/mod.rs @@ -60,6 +60,10 @@ pub enum Error { #[error("error connecting to a backend: {0}")] BackendConnect(#[source] std::io::Error), + /// Error set nonblock to a backend stream. + #[error("error set nonblocking to a backend: {0}")] + BackendSetNonBlock(#[source] std::io::Error), + /// Error reading from backend. #[error("error reading from backend: {0}")] BackendRead(#[source] std::io::Error), diff --git a/src/dragonball/src/dbs_virtio_devices/src/vsock/muxer/muxer_impl.rs b/src/dragonball/src/dbs_virtio_devices/src/vsock/muxer/muxer_impl.rs index 1b5b7c4e92..bfcc5fa1c8 100644 --- a/src/dragonball/src/dbs_virtio_devices/src/vsock/muxer/muxer_impl.rs +++ b/src/dragonball/src/dbs_virtio_devices/src/vsock/muxer/muxer_impl.rs @@ -36,14 +36,14 @@ /// route all these events to their handlers, the muxer uses another /// `HashMap` object, mapping `RawFd`s to `EpollListener`s. use std::collections::{HashMap, HashSet}; +use std::fs::File; use std::io::Read; use std::os::fd::FromRawFd; use std::os::unix::io::{AsRawFd, RawFd}; -use std::os::unix::net::UnixStream; use log::{debug, error, info, trace, warn}; -use super::super::backend::{HybridUnixStreamBackend, VsockBackend, VsockBackendType, VsockStream}; +use super::super::backend::{HybridStream, VsockBackend, VsockBackendType, VsockStream}; use super::super::csm::{ConnState, VsockConnection}; use super::super::defs::uapi; @@ -480,13 +480,17 @@ impl VsockMuxer { .and_then(|(nfd, local_port, peer_port)| { // Here we should make sure the nfd the sole owner to convert it // into an UnixStream object, otherwise, it could cause memory unsafety. - let nstream = unsafe { UnixStream::from_raw_fd(nfd) }; + let nstream = unsafe { File::from_raw_fd(nfd) }; - let hybridstream = HybridUnixStreamBackend { - unix_stream: Box::new(nstream), + let mut hybridstream = HybridStream { + hybrid_stream: nstream, slave_stream: Some(stream), }; + hybridstream + .set_nonblocking(true) + .map_err(Error::BackendSetNonBlock)?; + self.add_connection( ConnMapKey { local_port,