runtime: Fix virtiofs fd leak

The kata runtime invokes removeStaleVirtiofsShareMounts after
a container is stopped to clean up the stale virtiofs file caches.

Fixes: #6455
Signed-off-by: Feng Wang <fwang@confluent.io>
This commit is contained in:
Feng Wang 2023-04-26 15:38:08 -07:00
parent 509bc8b6c8
commit 205909fbed
12 changed files with 521 additions and 311 deletions

View File

@ -35,7 +35,7 @@ use crate::log_child;
// struct is populated from the content in the /proc/<pid>/mountinfo file. // struct is populated from the content in the /proc/<pid>/mountinfo file.
#[derive(std::fmt::Debug, PartialEq)] #[derive(std::fmt::Debug, PartialEq)]
pub struct Info { pub struct Info {
mount_point: String, pub mount_point: String,
optional: String, optional: String,
fstype: String, fstype: String,
} }
@ -553,7 +553,7 @@ fn rootfs_parent_mount_private(path: &str) -> Result<()> {
// Parse /proc/self/mountinfo because comparing Dev and ino does not work from // Parse /proc/self/mountinfo because comparing Dev and ino does not work from
// bind mounts // bind mounts
fn parse_mount_table(mountinfo_path: &str) -> Result<Vec<Info>> { pub fn parse_mount_table(mountinfo_path: &str) -> Result<Vec<Info>> {
let file = File::open(mountinfo_path)?; let file = File::open(mountinfo_path)?;
let reader = BufReader::new(file); let reader = BufReader::new(file);
let mut infos = Vec::new(); let mut infos = Vec::new();

View File

@ -40,6 +40,7 @@ use protocols::types::Interface;
use protocols::{agent_ttrpc_async as agent_ttrpc, health_ttrpc_async as health_ttrpc}; use protocols::{agent_ttrpc_async as agent_ttrpc, health_ttrpc_async as health_ttrpc};
use rustjail::cgroups::notifier; use rustjail::cgroups::notifier;
use rustjail::container::{BaseContainer, Container, LinuxContainer, SYSTEMD_CGROUP_PATH_FORMAT}; use rustjail::container::{BaseContainer, Container, LinuxContainer, SYSTEMD_CGROUP_PATH_FORMAT};
use rustjail::mount::parse_mount_table;
use rustjail::process::Process; use rustjail::process::Process;
use rustjail::specconv::CreateOpts; use rustjail::specconv::CreateOpts;
@ -96,6 +97,7 @@ const USR_IP6TABLES_SAVE: &str = "/usr/sbin/ip6tables-save";
const IP6TABLES_SAVE: &str = "/sbin/ip6tables-save"; const IP6TABLES_SAVE: &str = "/sbin/ip6tables-save";
const USR_IP6TABLES_RESTORE: &str = "/usr/sbin/ip6tables-save"; const USR_IP6TABLES_RESTORE: &str = "/usr/sbin/ip6tables-save";
const IP6TABLES_RESTORE: &str = "/sbin/ip6tables-restore"; const IP6TABLES_RESTORE: &str = "/sbin/ip6tables-restore";
const KATA_GUEST_SHARE_DIR: &str = "/run/kata-containers/shared/containers/";
const ERR_CANNOT_GET_WRITER: &str = "Cannot get writer"; const ERR_CANNOT_GET_WRITER: &str = "Cannot get writer";
const ERR_INVALID_BLOCK_SIZE: &str = "Invalid block size"; const ERR_INVALID_BLOCK_SIZE: &str = "Invalid block size";
@ -829,6 +831,29 @@ impl agent_ttrpc::AgentService for AgentService {
Ok(Empty::new()) Ok(Empty::new())
} }
async fn remove_stale_virtiofs_share_mounts(
&self,
ctx: &TtrpcContext,
req: protocols::agent::RemoveStaleVirtiofsShareMountsRequest,
) -> ttrpc::Result<Empty> {
trace_rpc_call!(ctx, "remove_stale_virtiofs_share_mounts", req);
is_allowed!(req);
let mount_infos = parse_mount_table("/proc/self/mountinfo")
.map_err(|e| ttrpc_error!(ttrpc::Code::INTERNAL, e))?;
for m in &mount_infos {
if m.mount_point.starts_with(KATA_GUEST_SHARE_DIR) {
// stat the mount point, virtiofs daemon will remove the stale cache and release the fds if the mount point doesn't exist any more.
// More details in https://github.com/kata-containers/kata-containers/issues/6455#issuecomment-1477137277
match stat::stat(Path::new(&m.mount_point)) {
Ok(_) => info!(sl!(), "stat {} success", m.mount_point),
Err(e) => info!(sl!(), "stat {} failed: {}", m.mount_point, e),
}
}
}
Ok(Empty::new())
}
async fn write_stdin( async fn write_stdin(
&self, &self,
_ctx: &TtrpcContext, _ctx: &TtrpcContext,

View File

@ -38,6 +38,7 @@ service AgentService {
rpc StatsContainer(StatsContainerRequest) returns (StatsContainerResponse); rpc StatsContainer(StatsContainerRequest) returns (StatsContainerResponse);
rpc PauseContainer(PauseContainerRequest) returns (google.protobuf.Empty); rpc PauseContainer(PauseContainerRequest) returns (google.protobuf.Empty);
rpc ResumeContainer(ResumeContainerRequest) returns (google.protobuf.Empty); rpc ResumeContainer(ResumeContainerRequest) returns (google.protobuf.Empty);
rpc RemoveStaleVirtiofsShareMounts(RemoveStaleVirtiofsShareMountsRequest) returns (google.protobuf.Empty);
// stdio // stdio
rpc WriteStdin(WriteStreamRequest) returns (WriteStreamResponse); rpc WriteStdin(WriteStreamRequest) returns (WriteStreamResponse);
@ -301,6 +302,8 @@ message CreateSandboxRequest {
message DestroySandboxRequest { message DestroySandboxRequest {
} }
message RemoveStaleVirtiofsShareMountsRequest {}
message Interfaces { message Interfaces {
repeated types.Interface Interfaces = 1; repeated types.Interface Interfaces = 1;
} }

View File

@ -138,6 +138,9 @@ type agent interface {
// resumeContainer will resume a paused container // resumeContainer will resume a paused container
resumeContainer(ctx context.Context, sandbox *Sandbox, c Container) error resumeContainer(ctx context.Context, sandbox *Sandbox, c Container) error
// removeStaleVirtiofsShareMounts will tell the agent to remove stale virtiofs share mounts in the guest.
removeStaleVirtiofsShareMounts(ctx context.Context) error
// configure will update agent settings based on provided arguments // configure will update agent settings based on provided arguments
configure(ctx context.Context, h Hypervisor, id, sharePath string, config KataAgentConfig) error configure(ctx context.Context, h Hypervisor, id, sharePath string, config KataAgentConfig) error

View File

@ -1020,6 +1020,10 @@ func (c *Container) stop(ctx context.Context, force bool) error {
} }
} }
if err := c.sandbox.agent.removeStaleVirtiofsShareMounts(ctx); err != nil && !force {
return err
}
if err := c.detachDevices(ctx); err != nil && !force { if err := c.detachDevices(ctx); err != nil && !force {
return err return err
} }

View File

@ -125,6 +125,7 @@ func TestUnmountHostMountsRemoveBindHostPath(t *testing.T) {
ctx: context.Background(), ctx: context.Background(),
id: "foobar", id: "foobar",
config: &SandboxConfig{}, config: &SandboxConfig{},
agent: newMockAgent(),
} }
fsShare, err := NewFilesystemShare(sandbox) fsShare, err := NewFilesystemShare(sandbox)

View File

@ -302,6 +302,7 @@ func TestMountSharedDirMounts(t *testing.T) {
sandbox := &Sandbox{ sandbox := &Sandbox{
ctx: context.Background(), ctx: context.Background(),
id: "foobar", id: "foobar",
agent: newMockAgent(),
hypervisor: &mockHypervisor{}, hypervisor: &mockHypervisor{},
config: &SandboxConfig{ config: &SandboxConfig{
HypervisorConfig: HypervisorConfig{ HypervisorConfig: HypervisorConfig{

View File

@ -113,41 +113,42 @@ var (
) )
const ( const (
grpcCheckRequest = "grpc.CheckRequest" grpcCheckRequest = "grpc.CheckRequest"
grpcExecProcessRequest = "grpc.ExecProcessRequest" grpcExecProcessRequest = "grpc.ExecProcessRequest"
grpcCreateSandboxRequest = "grpc.CreateSandboxRequest" grpcCreateSandboxRequest = "grpc.CreateSandboxRequest"
grpcDestroySandboxRequest = "grpc.DestroySandboxRequest" grpcDestroySandboxRequest = "grpc.DestroySandboxRequest"
grpcCreateContainerRequest = "grpc.CreateContainerRequest" grpcCreateContainerRequest = "grpc.CreateContainerRequest"
grpcStartContainerRequest = "grpc.StartContainerRequest" grpcStartContainerRequest = "grpc.StartContainerRequest"
grpcRemoveContainerRequest = "grpc.RemoveContainerRequest" grpcRemoveContainerRequest = "grpc.RemoveContainerRequest"
grpcSignalProcessRequest = "grpc.SignalProcessRequest" grpcSignalProcessRequest = "grpc.SignalProcessRequest"
grpcUpdateRoutesRequest = "grpc.UpdateRoutesRequest" grpcUpdateRoutesRequest = "grpc.UpdateRoutesRequest"
grpcUpdateInterfaceRequest = "grpc.UpdateInterfaceRequest" grpcUpdateInterfaceRequest = "grpc.UpdateInterfaceRequest"
grpcUpdateEphemeralMountsRequest = "grpc.UpdateEphemeralMountsRequest" grpcUpdateEphemeralMountsRequest = "grpc.UpdateEphemeralMountsRequest"
grpcListInterfacesRequest = "grpc.ListInterfacesRequest" grpcRemoveStaleVirtiofsShareMountsRequest = "grpc.RemoveStaleVirtiofsShareMountsRequest"
grpcListRoutesRequest = "grpc.ListRoutesRequest" grpcListInterfacesRequest = "grpc.ListInterfacesRequest"
grpcAddARPNeighborsRequest = "grpc.AddARPNeighborsRequest" grpcListRoutesRequest = "grpc.ListRoutesRequest"
grpcOnlineCPUMemRequest = "grpc.OnlineCPUMemRequest" grpcAddARPNeighborsRequest = "grpc.AddARPNeighborsRequest"
grpcUpdateContainerRequest = "grpc.UpdateContainerRequest" grpcOnlineCPUMemRequest = "grpc.OnlineCPUMemRequest"
grpcWaitProcessRequest = "grpc.WaitProcessRequest" grpcUpdateContainerRequest = "grpc.UpdateContainerRequest"
grpcTtyWinResizeRequest = "grpc.TtyWinResizeRequest" grpcWaitProcessRequest = "grpc.WaitProcessRequest"
grpcWriteStreamRequest = "grpc.WriteStreamRequest" grpcTtyWinResizeRequest = "grpc.TtyWinResizeRequest"
grpcCloseStdinRequest = "grpc.CloseStdinRequest" grpcWriteStreamRequest = "grpc.WriteStreamRequest"
grpcStatsContainerRequest = "grpc.StatsContainerRequest" grpcCloseStdinRequest = "grpc.CloseStdinRequest"
grpcPauseContainerRequest = "grpc.PauseContainerRequest" grpcStatsContainerRequest = "grpc.StatsContainerRequest"
grpcResumeContainerRequest = "grpc.ResumeContainerRequest" grpcPauseContainerRequest = "grpc.PauseContainerRequest"
grpcReseedRandomDevRequest = "grpc.ReseedRandomDevRequest" grpcResumeContainerRequest = "grpc.ResumeContainerRequest"
grpcGuestDetailsRequest = "grpc.GuestDetailsRequest" grpcReseedRandomDevRequest = "grpc.ReseedRandomDevRequest"
grpcMemHotplugByProbeRequest = "grpc.MemHotplugByProbeRequest" grpcGuestDetailsRequest = "grpc.GuestDetailsRequest"
grpcCopyFileRequest = "grpc.CopyFileRequest" grpcMemHotplugByProbeRequest = "grpc.MemHotplugByProbeRequest"
grpcSetGuestDateTimeRequest = "grpc.SetGuestDateTimeRequest" grpcCopyFileRequest = "grpc.CopyFileRequest"
grpcGetOOMEventRequest = "grpc.GetOOMEventRequest" grpcSetGuestDateTimeRequest = "grpc.SetGuestDateTimeRequest"
grpcGetMetricsRequest = "grpc.GetMetricsRequest" grpcGetOOMEventRequest = "grpc.GetOOMEventRequest"
grpcAddSwapRequest = "grpc.AddSwapRequest" grpcGetMetricsRequest = "grpc.GetMetricsRequest"
grpcVolumeStatsRequest = "grpc.VolumeStatsRequest" grpcAddSwapRequest = "grpc.AddSwapRequest"
grpcResizeVolumeRequest = "grpc.ResizeVolumeRequest" grpcVolumeStatsRequest = "grpc.VolumeStatsRequest"
grpcGetIPTablesRequest = "grpc.GetIPTablesRequest" grpcResizeVolumeRequest = "grpc.ResizeVolumeRequest"
grpcSetIPTablesRequest = "grpc.SetIPTablesRequest" grpcGetIPTablesRequest = "grpc.GetIPTablesRequest"
grpcSetIPTablesRequest = "grpc.SetIPTablesRequest"
) )
// newKataAgent returns an agent from an agent type. // newKataAgent returns an agent from an agent type.
@ -1947,6 +1948,11 @@ func (k *kataAgent) reseedRNG(ctx context.Context, data []byte) error {
return err return err
} }
func (k *kataAgent) removeStaleVirtiofsShareMounts(ctx context.Context) error {
_, err := k.sendReq(ctx, &grpc.RemoveStaleVirtiofsShareMountsRequest{})
return err
}
type reqFunc func(context.Context, interface{}) (interface{}, error) type reqFunc func(context.Context, interface{}) (interface{}, error)
func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) { func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) {
@ -2056,6 +2062,9 @@ func (k *kataAgent) installReqFunc(c *kataclient.AgentClient) {
k.reqHandlers[grpcSetIPTablesRequest] = func(ctx context.Context, req interface{}) (interface{}, error) { k.reqHandlers[grpcSetIPTablesRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.AgentServiceClient.SetIPTables(ctx, req.(*grpc.SetIPTablesRequest)) return k.client.AgentServiceClient.SetIPTables(ctx, req.(*grpc.SetIPTablesRequest))
} }
k.reqHandlers[grpcRemoveStaleVirtiofsShareMountsRequest] = func(ctx context.Context, req interface{}) (interface{}, error) {
return k.client.AgentServiceClient.RemoveStaleVirtiofsShareMounts(ctx, req.(*grpc.RemoveStaleVirtiofsShareMountsRequest))
}
} }
func (k *kataAgent) getReqContext(ctx context.Context, reqName string) (newCtx context.Context, cancel context.CancelFunc) { func (k *kataAgent) getReqContext(ctx context.Context, reqName string) (newCtx context.Context, cancel context.CancelFunc) {

View File

@ -846,6 +846,7 @@ func TestAgentCreateContainer(t *testing.T) {
}, },
}, },
hypervisor: &mockHypervisor{}, hypervisor: &mockHypervisor{},
agent: newMockAgent(),
} }
fsShare, err := NewFilesystemShare(sandbox) fsShare, err := NewFilesystemShare(sandbox)

View File

@ -141,6 +141,11 @@ func (n *mockAgent) waitProcess(ctx context.Context, c *Container, processID str
return 0, nil return 0, nil
} }
// removeStaleVirtiofsShareMounts is the Noop agent removeStaleVirtiofsShareMounts implementation. It does nothing.
func (n *mockAgent) removeStaleVirtiofsShareMounts(ctx context.Context) error {
return nil
}
// winsizeProcess is the Noop agent process tty resizer. It does nothing. // winsizeProcess is the Noop agent process tty resizer. It does nothing.
func (n *mockAgent) winsizeProcess(ctx context.Context, c *Container, processID string, height, width uint32) error { func (n *mockAgent) winsizeProcess(ctx context.Context, c *Container, processID string, height, width uint32) error {
return nil return nil

View File

@ -245,6 +245,10 @@ func (p *HybridVSockTTRPCMockImp) ResizeVolume(ctx context.Context, req *pb.Resi
return &gpb.Empty{}, nil return &gpb.Empty{}, nil
} }
func (p *HybridVSockTTRPCMockImp) RemoveStaleVirtiofsShareMounts(ctx context.Context, req *pb.RemoveStaleVirtiofsShareMountsRequest) (*gpb.Empty, error) {
return &gpb.Empty{}, nil
}
func (p *HybridVSockTTRPCMockImp) GetIPTables(ctx context.Context, req *pb.GetIPTablesRequest) (*pb.GetIPTablesResponse, error) { func (p *HybridVSockTTRPCMockImp) GetIPTables(ctx context.Context, req *pb.GetIPTablesRequest) (*pb.GetIPTablesResponse, error) {
return &pb.GetIPTablesResponse{}, nil return &pb.GetIPTablesResponse{}, nil
} }