From 2e182e736b41d3e8d85e4a49fedfb53e1657448f Mon Sep 17 00:00:00 2001 From: zhifei92 Date: Fri, 20 Sep 2024 14:57:07 +0800 Subject: [PATCH] feat: Add cri proxy for e2e_node add example of using CRI proxy fix: Invalid function call fix: Optimize getPodImagePullDuration fix: Return error if the CRI Proxy is undefined chore: add a document --- test/e2e/feature/feature.go | 8 + test/e2e/framework/test_context.go | 2 + test/e2e_node/criproxy/endpoint.go | 33 ++ test/e2e_node/criproxy/proxy_image_service.go | 104 ++++ .../criproxy/proxy_runtime_service.go | 523 ++++++++++++++++++ test/e2e_node/criproxy_test.go | 172 ++++++ test/e2e_node/e2e_node_suite_test.go | 26 +- test/e2e_node/utils_linux.go | 20 + 8 files changed, 887 insertions(+), 1 deletion(-) create mode 100644 test/e2e_node/criproxy/endpoint.go create mode 100644 test/e2e_node/criproxy/proxy_image_service.go create mode 100644 test/e2e_node/criproxy/proxy_runtime_service.go create mode 100644 test/e2e_node/criproxy_test.go diff --git a/test/e2e/feature/feature.go b/test/e2e/feature/feature.go index c14ca8eaba0..2e50189b2f7 100644 --- a/test/e2e/feature/feature.go +++ b/test/e2e/feature/feature.go @@ -363,6 +363,14 @@ var ( // (used for testing fine-grained SupplementalGroups control ) SupplementalGroupsPolicy = framework.WithFeature(framework.ValidFeatures.Add("SupplementalGroupsPolicy")) + // Owner: sig-node + // Tests marked with this feature MUST run with the CRI Proxy configured so errors can be injected into the kubelet's CRI calls. + // This is useful for testing how the kubelet handles various error conditions in its CRI interactions. + // test-infra jobs: + // - pull-kubernetes-node-e2e-cri-proxy-serial (need manual trigger) + // - ci-kubernetes-node-e2e-cri-proxy-serial + CriProxy = framework.WithFeature(framework.ValidFeatures.Add("CriProxy")) + // Owner: sig-network // Marks tests that require a cluster with Topology Hints enabled. TopologyHints = framework.WithFeature(framework.ValidFeatures.Add("Topology Hints")) diff --git a/test/e2e/framework/test_context.go b/test/e2e/framework/test_context.go index ba65d757a6e..51a21e7d23e 100644 --- a/test/e2e/framework/test_context.go +++ b/test/e2e/framework/test_context.go @@ -277,6 +277,8 @@ type NodeTestContextType struct { ExtraEnvs map[string]string // StandaloneMode indicates whether the test is running kubelet in a standalone mode. StandaloneMode bool + // CriProxyEnabled indicates whether enable CRI API proxy for failure injection. + CriProxyEnabled bool } // CloudConfig holds the cloud configuration for e2e test suites. diff --git a/test/e2e_node/criproxy/endpoint.go b/test/e2e_node/criproxy/endpoint.go new file mode 100644 index 00000000000..8c89c14e3a2 --- /dev/null +++ b/test/e2e_node/criproxy/endpoint.go @@ -0,0 +1,33 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package criproxy + +import ( + "fmt" + + "k8s.io/apimachinery/pkg/util/rand" +) + +const ( + defaultUnixEndpoint = "unix:///tmp/kubelet_remote_proxy_%v.sock" +) + +// GenerateEndpoint generates a new unix socket server of grpc server. +func GenerateEndpoint() (string, error) { + // use random int be a part fo file name + return fmt.Sprintf(defaultUnixEndpoint, rand.Int()), nil +} diff --git a/test/e2e_node/criproxy/proxy_image_service.go b/test/e2e_node/criproxy/proxy_image_service.go new file mode 100644 index 00000000000..336a61528fe --- /dev/null +++ b/test/e2e_node/criproxy/proxy_image_service.go @@ -0,0 +1,104 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package criproxy + +import ( + "context" + + kubeapi "k8s.io/cri-api/pkg/apis/runtime/v1" +) + +const ( + ListImages = "ListImages" + ImageStatus = "ImageStatus" + PullImage = "PullImage" + RemoveImage = "RemoveImage" + ImageFsInfo = "ImageFsInfo" +) + +// ListImages lists existing images. +func (p *RemoteRuntime) ListImages(ctx context.Context, req *kubeapi.ListImagesRequest) (*kubeapi.ListImagesResponse, error) { + if err := p.runInjectors(ListImages); err != nil { + return nil, err + } + + images, err := p.imageService.ListImages(ctx, req.Filter) + if err != nil { + return nil, err + } + return &kubeapi.ListImagesResponse{ + Images: images, + }, nil +} + +// ImageStatus returns the status of the image. If the image is not +// present, returns a response with ImageStatusResponse.Image set to +// nil. +func (p *RemoteRuntime) ImageStatus(ctx context.Context, req *kubeapi.ImageStatusRequest) (*kubeapi.ImageStatusResponse, error) { + if err := p.runInjectors(ImageStatus); err != nil { + return nil, err + } + + resp, err := p.imageService.ImageStatus(ctx, req.Image, false) + if err != nil { + return nil, err + } + return resp, nil +} + +// PullImage pulls an image with authentication config. +func (p *RemoteRuntime) PullImage(ctx context.Context, req *kubeapi.PullImageRequest) (*kubeapi.PullImageResponse, error) { + if err := p.runInjectors(PullImage); err != nil { + return nil, err + } + + image, err := p.imageService.PullImage(ctx, req.Image, req.Auth, req.SandboxConfig) + if err != nil { + return nil, err + } + return &kubeapi.PullImageResponse{ + ImageRef: image, + }, nil +} + +// RemoveImage removes the image. +// This call is idempotent, and must not return an error if the image has +// already been removed. +func (p *RemoteRuntime) RemoveImage(ctx context.Context, req *kubeapi.RemoveImageRequest) (*kubeapi.RemoveImageResponse, error) { + if err := p.runInjectors(RemoveImage); err != nil { + return nil, err + } + + err := p.imageService.RemoveImage(ctx, req.Image) + if err != nil { + return nil, err + } + return &kubeapi.RemoveImageResponse{}, nil +} + +// ImageFsInfo returns information of the filesystem that is used to store images. +func (p *RemoteRuntime) ImageFsInfo(ctx context.Context, req *kubeapi.ImageFsInfoRequest) (*kubeapi.ImageFsInfoResponse, error) { + if err := p.runInjectors(ImageFsInfo); err != nil { + return nil, err + } + + resp, err := p.imageService.ImageFsInfo(ctx) + if err != nil { + return nil, err + } + return resp, nil +} diff --git a/test/e2e_node/criproxy/proxy_runtime_service.go b/test/e2e_node/criproxy/proxy_runtime_service.go new file mode 100644 index 00000000000..cc17e970c6b --- /dev/null +++ b/test/e2e_node/criproxy/proxy_runtime_service.go @@ -0,0 +1,523 @@ +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package criproxy + +import ( + "context" + "errors" + "fmt" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" + + internalapi "k8s.io/cri-api/pkg/apis" + runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1" + "k8s.io/cri-client/pkg/util" + "k8s.io/kubernetes/test/e2e/framework" + utilexec "k8s.io/utils/exec" +) + +const ( + Version = "Version" + RunPodSandbox = "RunPodSandbox" + StopPodSandbox = "StopPodSandbox" + RemovePodSandbox = "RemovePodSandbox" + PodSandboxStatus = "PodSandboxStatus" + ListPodSandbox = "ListPodSandbox" + CreateContainer = "CreateContainer" + StartContainer = "StartContainer" + StopContainer = "StopContainer" + RemoveContainer = "RemoveContainer" + ListContainers = "ListContainers" + ContainerStatus = "ContainerStatus" + UpdateContainerResources = "UpdateContainerResources" + ReopenContainerLog = "ReopenContainerLog" + ExecSync = "ExecSync" + Exec = "Exec" + Attach = "Attach" + PortForward = "PortForward" + ContainerStats = "ContainerStats" + ListContainerStats = "ListContainerStats" + PodSandboxStats = "PodSandboxStats" + ListPodSandboxStats = "ListPodSandboxStats" + UpdateRuntimeConfig = "UpdateRuntimeConfig" + Status = "Status" + CheckpointContainer = "CheckpointContainer" + GetContainerEvents = "GetContainerEvents" + ListMetricDescriptors = "ListMetricDescriptors" + ListPodSandboxMetrics = "ListPodSandboxMetrics" + RuntimeConfig = "RuntimeConfig" +) + +// AddInjector inject the error or delay to the next call to the RuntimeService. +func (p *RemoteRuntime) AddInjector(injector func(string) error) { + p.injectors = append(p.injectors, injector) +} + +// ResetInjectors resets all registered injectors. +func (p *RemoteRuntime) ResetInjectors() { + p.injectors = []func(string) error{} +} + +func (p *RemoteRuntime) runInjectors(apiName string) error { + for _, injector := range p.injectors { + if err := injector(apiName); err != nil { + return err + } + } + return nil +} + +// RemoteRuntime represents a proxy for remote container runtime. +type RemoteRuntime struct { + server *grpc.Server + injectors []func(string) error + runtimeService internalapi.RuntimeService + imageService internalapi.ImageManagerService +} + +// NewRemoteRuntimeProxy creates a new RemoteRuntime. +func NewRemoteRuntimeProxy(runtimeService internalapi.RuntimeService, imageService internalapi.ImageManagerService) *RemoteRuntime { + p := &RemoteRuntime{ + server: grpc.NewServer(), + runtimeService: runtimeService, + imageService: imageService, + } + runtimeapi.RegisterRuntimeServiceServer(p.server, p) + runtimeapi.RegisterImageServiceServer(p.server, p) + + return p +} + +// Start starts the remote runtime proxy. +func (p *RemoteRuntime) Start(endpoint string) error { + l, err := util.CreateListener(endpoint) + if err != nil { + return fmt.Errorf("failed to listen on %q: %w", endpoint, err) + } + + go func() { + if err := p.server.Serve(l); err != nil { + framework.Failf("Failed to start cri proxy : %v", err) + } + }() + return nil +} + +// Stop stops the fake remote runtime proxy. +func (p *RemoteRuntime) Stop() { + p.server.Stop() +} + +// Version returns the runtime name, runtime version, and runtime API version. +func (p *RemoteRuntime) Version(ctx context.Context, req *runtimeapi.VersionRequest) (*runtimeapi.VersionResponse, error) { + if err := p.runInjectors(Version); err != nil { + return nil, err + } + return p.runtimeService.Version(ctx, req.Version) +} + +// RunPodSandbox creates and starts a pod-level sandbox. Runtimes must ensure +// the sandbox is in the ready state on success. +func (p *RemoteRuntime) RunPodSandbox(ctx context.Context, req *runtimeapi.RunPodSandboxRequest) (*runtimeapi.RunPodSandboxResponse, error) { + if err := p.runInjectors(RunPodSandbox); err != nil { + return nil, err + } + + sandboxID, err := p.runtimeService.RunPodSandbox(ctx, req.Config, req.RuntimeHandler) + if err != nil { + return nil, err + } + return &runtimeapi.RunPodSandboxResponse{PodSandboxId: sandboxID}, nil +} + +// StopPodSandbox stops any running process that is part of the sandbox and +// reclaims network resources (e.g., IP addresses) allocated to the sandbox. +// If there are any running containers in the sandbox, they must be forcibly +// terminated. +func (p *RemoteRuntime) StopPodSandbox(ctx context.Context, req *runtimeapi.StopPodSandboxRequest) (*runtimeapi.StopPodSandboxResponse, error) { + if err := p.runInjectors(StopPodSandbox); err != nil { + return nil, err + } + + err := p.runtimeService.StopPodSandbox(ctx, req.PodSandboxId) + if err != nil { + return nil, err + } + return &runtimeapi.StopPodSandboxResponse{}, nil +} + +// RemovePodSandbox removes the sandbox. If there are any running containers +// in the sandbox, they must be forcibly terminated and removed. +// This call is idempotent, and must not return an error if the sandbox has +// already been removed. +func (p *RemoteRuntime) RemovePodSandbox(ctx context.Context, req *runtimeapi.RemovePodSandboxRequest) (*runtimeapi.RemovePodSandboxResponse, error) { + if err := p.runInjectors(RemovePodSandbox); err != nil { + return nil, err + } + err := p.runtimeService.RemovePodSandbox(ctx, req.PodSandboxId) + if err != nil { + return nil, err + } + + return &runtimeapi.RemovePodSandboxResponse{}, nil +} + +// PodSandboxStatus returns the status of the PodSandbox. If the PodSandbox is not +// present, returns an error. +func (p *RemoteRuntime) PodSandboxStatus(ctx context.Context, req *runtimeapi.PodSandboxStatusRequest) (*runtimeapi.PodSandboxStatusResponse, error) { + if err := p.runInjectors(PodSandboxStatus); err != nil { + return nil, err + } + + resp, err := p.runtimeService.PodSandboxStatus(ctx, req.PodSandboxId, false) + if err != nil { + return nil, err + } + return resp, nil +} + +// ListPodSandbox returns a list of PodSandboxes. +func (p *RemoteRuntime) ListPodSandbox(ctx context.Context, req *runtimeapi.ListPodSandboxRequest) (*runtimeapi.ListPodSandboxResponse, error) { + if err := p.runInjectors(ListPodSandbox); err != nil { + return nil, err + } + + items, err := p.runtimeService.ListPodSandbox(ctx, req.Filter) + if err != nil { + return nil, err + } + return &runtimeapi.ListPodSandboxResponse{Items: items}, nil +} + +// CreateContainer creates a new container in specified PodSandbox +func (p *RemoteRuntime) CreateContainer(ctx context.Context, req *runtimeapi.CreateContainerRequest) (*runtimeapi.CreateContainerResponse, error) { + if err := p.runInjectors(CreateContainer); err != nil { + return nil, err + } + + containerID, err := p.runtimeService.CreateContainer(ctx, req.PodSandboxId, req.Config, req.SandboxConfig) + if err != nil { + return nil, err + } + return &runtimeapi.CreateContainerResponse{ContainerId: containerID}, nil +} + +// StartContainer starts the container. +func (p *RemoteRuntime) StartContainer(ctx context.Context, req *runtimeapi.StartContainerRequest) (*runtimeapi.StartContainerResponse, error) { + if err := p.runInjectors(StartContainer); err != nil { + return nil, err + } + + err := p.runtimeService.StartContainer(ctx, req.ContainerId) + if err != nil { + return nil, err + } + return &runtimeapi.StartContainerResponse{}, nil +} + +// StopContainer stops a running container with a grace period (i.e., timeout). +// This call is idempotent, and must not return an error if the container has +// already been stopped. +func (p *RemoteRuntime) StopContainer(ctx context.Context, req *runtimeapi.StopContainerRequest) (*runtimeapi.StopContainerResponse, error) { + if err := p.runInjectors(StopContainer); err != nil { + return nil, err + } + + err := p.runtimeService.StopContainer(ctx, req.ContainerId, req.Timeout) + if err != nil { + return nil, err + } + return &runtimeapi.StopContainerResponse{}, nil +} + +// RemoveContainer removes the container. If the container is running, the +// container must be forcibly removed. +// This call is idempotent, and must not return an error if the container has +// already been removed. +func (p *RemoteRuntime) RemoveContainer(ctx context.Context, req *runtimeapi.RemoveContainerRequest) (*runtimeapi.RemoveContainerResponse, error) { + if err := p.runInjectors(RemoveContainer); err != nil { + return nil, err + } + + err := p.runtimeService.RemoveContainer(ctx, req.ContainerId) + if err != nil { + return nil, err + } + return &runtimeapi.RemoveContainerResponse{}, nil +} + +// ListContainers lists all containers by filters. +func (p *RemoteRuntime) ListContainers(ctx context.Context, req *runtimeapi.ListContainersRequest) (*runtimeapi.ListContainersResponse, error) { + if err := p.runInjectors(ListContainers); err != nil { + return nil, err + } + + items, err := p.runtimeService.ListContainers(ctx, req.Filter) + if err != nil { + return nil, err + } + return &runtimeapi.ListContainersResponse{Containers: items}, nil +} + +// ContainerStatus returns status of the container. If the container is not +// present, returns an error. +func (p *RemoteRuntime) ContainerStatus(ctx context.Context, req *runtimeapi.ContainerStatusRequest) (*runtimeapi.ContainerStatusResponse, error) { + if err := p.runInjectors(ContainerStatus); err != nil { + return nil, err + } + + resp, err := p.runtimeService.ContainerStatus(ctx, req.ContainerId, false) + if err != nil { + return nil, err + } + return resp, nil +} + +// ExecSync runs a command in a container synchronously. +func (p *RemoteRuntime) ExecSync(ctx context.Context, req *runtimeapi.ExecSyncRequest) (*runtimeapi.ExecSyncResponse, error) { + if err := p.runInjectors(ExecSync); err != nil { + return nil, err + } + + var exitCode int32 + stdout, stderr, err := p.runtimeService.ExecSync(ctx, req.ContainerId, req.Cmd, time.Duration(req.Timeout)*time.Second) + if err != nil { + var exitError utilexec.ExitError + ok := errors.As(err, &exitError) + if !ok { + return nil, err + } + exitCode = int32(exitError.ExitStatus()) + } + return &runtimeapi.ExecSyncResponse{ + Stdout: stdout, + Stderr: stderr, + ExitCode: exitCode, + }, nil +} + +// Exec prepares a streaming endpoint to execute a command in the container. +func (p *RemoteRuntime) Exec(ctx context.Context, req *runtimeapi.ExecRequest) (*runtimeapi.ExecResponse, error) { + if err := p.runInjectors(Exec); err != nil { + return nil, err + } + + return p.runtimeService.Exec(ctx, req) +} + +// Attach prepares a streaming endpoint to attach to a running container. +func (p *RemoteRuntime) Attach(ctx context.Context, req *runtimeapi.AttachRequest) (*runtimeapi.AttachResponse, error) { + if err := p.runInjectors(Attach); err != nil { + return nil, err + } + + return p.runtimeService.Attach(ctx, req) +} + +// PortForward prepares a streaming endpoint to forward ports from a PodSandbox. +func (p *RemoteRuntime) PortForward(ctx context.Context, req *runtimeapi.PortForwardRequest) (*runtimeapi.PortForwardResponse, error) { + if err := p.runInjectors(PortForward); err != nil { + return nil, err + } + + return p.runtimeService.PortForward(ctx, req) +} + +// ContainerStats returns stats of the container. If the container does not +// exist, the call returns an error. +func (p *RemoteRuntime) ContainerStats(ctx context.Context, req *runtimeapi.ContainerStatsRequest) (*runtimeapi.ContainerStatsResponse, error) { + if err := p.runInjectors(ContainerStats); err != nil { + return nil, err + } + + stats, err := p.runtimeService.ContainerStats(ctx, req.ContainerId) + if err != nil { + return nil, err + } + return &runtimeapi.ContainerStatsResponse{Stats: stats}, nil +} + +// ListContainerStats returns stats of all running containers. +func (p *RemoteRuntime) ListContainerStats(ctx context.Context, req *runtimeapi.ListContainerStatsRequest) (*runtimeapi.ListContainerStatsResponse, error) { + if err := p.runInjectors(ListContainerStats); err != nil { + return nil, err + } + + stats, err := p.runtimeService.ListContainerStats(ctx, req.Filter) + if err != nil { + return nil, err + } + return &runtimeapi.ListContainerStatsResponse{Stats: stats}, nil +} + +// PodSandboxStats returns stats of the pod. If the pod does not +// exist, the call returns an error. +func (p *RemoteRuntime) PodSandboxStats(ctx context.Context, req *runtimeapi.PodSandboxStatsRequest) (*runtimeapi.PodSandboxStatsResponse, error) { + if err := p.runInjectors(PodSandboxStats); err != nil { + return nil, err + } + + stats, err := p.runtimeService.PodSandboxStats(ctx, req.PodSandboxId) + if err != nil { + return nil, err + } + return &runtimeapi.PodSandboxStatsResponse{Stats: stats}, nil +} + +// ListPodSandboxStats returns stats of all running pods. +func (p *RemoteRuntime) ListPodSandboxStats(ctx context.Context, req *runtimeapi.ListPodSandboxStatsRequest) (*runtimeapi.ListPodSandboxStatsResponse, error) { + if err := p.runInjectors(ListPodSandboxStats); err != nil { + return nil, err + } + + stats, err := p.runtimeService.ListPodSandboxStats(ctx, req.Filter) + if err != nil { + return nil, err + } + return &runtimeapi.ListPodSandboxStatsResponse{Stats: stats}, nil +} + +// UpdateRuntimeConfig updates the runtime configuration based on the given request. +func (p *RemoteRuntime) UpdateRuntimeConfig(ctx context.Context, req *runtimeapi.UpdateRuntimeConfigRequest) (*runtimeapi.UpdateRuntimeConfigResponse, error) { + if err := p.runInjectors(UpdateRuntimeConfig); err != nil { + return nil, err + } + + err := p.runtimeService.UpdateRuntimeConfig(ctx, req.RuntimeConfig) + if err != nil { + return nil, err + } + return &runtimeapi.UpdateRuntimeConfigResponse{}, nil +} + +// Status returns the status of the runtime. +func (p *RemoteRuntime) Status(ctx context.Context, req *runtimeapi.StatusRequest) (*runtimeapi.StatusResponse, error) { + if err := p.runInjectors(Status); err != nil { + return nil, err + } + + resp, err := p.runtimeService.Status(ctx, false) + if err != nil { + return nil, err + } + return resp, nil +} + +// UpdateContainerResources updates ContainerConfig of the container. +func (p *RemoteRuntime) UpdateContainerResources(ctx context.Context, req *runtimeapi.UpdateContainerResourcesRequest) (*runtimeapi.UpdateContainerResourcesResponse, error) { + if err := p.runInjectors(UpdateContainerResources); err != nil { + return nil, err + } + + err := p.runtimeService.UpdateContainerResources(ctx, req.ContainerId, &runtimeapi.ContainerResources{Linux: req.Linux}) + if err != nil { + return nil, err + } + return &runtimeapi.UpdateContainerResourcesResponse{}, nil +} + +// ReopenContainerLog reopens the container log file. +func (p *RemoteRuntime) ReopenContainerLog(ctx context.Context, req *runtimeapi.ReopenContainerLogRequest) (*runtimeapi.ReopenContainerLogResponse, error) { + if err := p.runInjectors(ReopenContainerLog); err != nil { + return nil, err + } + + err := p.runtimeService.ReopenContainerLog(ctx, req.ContainerId) + if err != nil { + return nil, err + } + return &runtimeapi.ReopenContainerLogResponse{}, nil +} + +// CheckpointContainer checkpoints the given container. +func (p *RemoteRuntime) CheckpointContainer(ctx context.Context, req *runtimeapi.CheckpointContainerRequest) (*runtimeapi.CheckpointContainerResponse, error) { + if err := p.runInjectors(CheckpointContainer); err != nil { + return nil, err + } + + err := p.runtimeService.CheckpointContainer(ctx, req) + if err != nil { + return nil, err + } + return &runtimeapi.CheckpointContainerResponse{}, nil +} + +func (p *RemoteRuntime) GetContainerEvents(req *runtimeapi.GetEventsRequest, ces runtimeapi.RuntimeService_GetContainerEventsServer) error { + if err := p.runInjectors(GetContainerEvents); err != nil { + return err + } + + // Capacity of the channel for receiving pod lifecycle events. This number + // is a bit arbitrary and may be adjusted in the future. + plegChannelCapacity := 1000 + containerEventsResponseCh := make(chan *runtimeapi.ContainerEventResponse, plegChannelCapacity) + defer close(containerEventsResponseCh) + + if err := p.runtimeService.GetContainerEvents(context.Background(), containerEventsResponseCh, nil); err != nil { + return err + } + + for event := range containerEventsResponseCh { + if err := ces.Send(event); err != nil { + return status.Errorf(codes.Unknown, "Failed to send event: %v", err) + } + } + + return nil +} + +// ListMetricDescriptors gets the descriptors for the metrics that will be returned in ListPodSandboxMetrics. +func (p *RemoteRuntime) ListMetricDescriptors(ctx context.Context, req *runtimeapi.ListMetricDescriptorsRequest) (*runtimeapi.ListMetricDescriptorsResponse, error) { + if err := p.runInjectors(ListMetricDescriptors); err != nil { + return nil, err + } + + descs, err := p.runtimeService.ListMetricDescriptors(ctx) + if err != nil { + return nil, err + } + return &runtimeapi.ListMetricDescriptorsResponse{Descriptors: descs}, nil +} + +// ListPodSandboxMetrics retrieves the metrics for all pod sandboxes. +func (p *RemoteRuntime) ListPodSandboxMetrics(ctx context.Context, req *runtimeapi.ListPodSandboxMetricsRequest) (*runtimeapi.ListPodSandboxMetricsResponse, error) { + if err := p.runInjectors(ListPodSandboxMetrics); err != nil { + return nil, err + } + + podMetrics, err := p.runtimeService.ListPodSandboxMetrics(ctx) + if err != nil { + return nil, err + } + return &runtimeapi.ListPodSandboxMetricsResponse{PodMetrics: podMetrics}, nil +} + +// RuntimeConfig returns the configuration information of the runtime. +func (p *RemoteRuntime) RuntimeConfig(ctx context.Context, req *runtimeapi.RuntimeConfigRequest) (*runtimeapi.RuntimeConfigResponse, error) { + if err := p.runInjectors(RuntimeConfig); err != nil { + return nil, err + } + + resp, err := p.runtimeService.RuntimeConfig(ctx) + if err != nil { + return nil, err + } + return resp, nil +} diff --git a/test/e2e_node/criproxy_test.go b/test/e2e_node/criproxy_test.go new file mode 100644 index 00000000000..fd200b23086 --- /dev/null +++ b/test/e2e_node/criproxy_test.go @@ -0,0 +1,172 @@ +//go:build linux +// +build linux + +/* +Copyright 2024 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package e2enode + +import ( + "context" + "fmt" + "strings" + "time" + + "github.com/onsi/ginkgo/v2" + "github.com/onsi/gomega" + + v1 "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/util/uuid" + kubeletevents "k8s.io/kubernetes/pkg/kubelet/events" + "k8s.io/kubernetes/test/e2e/feature" + "k8s.io/kubernetes/test/e2e/framework" + e2epod "k8s.io/kubernetes/test/e2e/framework/pod" + "k8s.io/kubernetes/test/e2e_node/criproxy" + imageutils "k8s.io/kubernetes/test/utils/image" + admissionapi "k8s.io/pod-security-admission/api" +) + +// Examples of using CRI proxy +var _ = SIGDescribe(feature.CriProxy, framework.WithSerial(), func() { + f := framework.NewDefaultFramework("cri-proxy-example") + f.NamespacePodSecurityLevel = admissionapi.LevelPrivileged + + ginkgo.Context("Inject a pull image error exception into the CriProxy", func() { + ginkgo.BeforeEach(func() { + if err := resetCRIProxyInjector(); err != nil { + ginkgo.Skip("Skip the test since the CRI Proxy is undefined.") + } + }) + + ginkgo.AfterEach(func() { + err := resetCRIProxyInjector() + framework.ExpectNoError(err) + }) + + ginkgo.It("Pod failed to start due to an image pull error.", func(ctx context.Context) { + expectedErr := fmt.Errorf("PullImage failed") + err := addCRIProxyInjector(func(apiName string) error { + if apiName == criproxy.PullImage { + return expectedErr + } + return nil + }) + framework.ExpectNoError(err) + + pod := e2epod.NewPodClient(f).Create(ctx, newPullImageAlwaysPod()) + podErr := e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod) + gomega.Expect(podErr).To(gomega.HaveOccurred()) + + eventMsg, err := getFailedToPullImageMsg(ctx, f, pod.Name) + framework.ExpectNoError(err) + isExpectedErrMsg := strings.Contains(eventMsg, expectedErr.Error()) + gomega.Expect(isExpectedErrMsg).To(gomega.BeTrueBecause("we injected an exception into the PullImage interface of the cri proxy")) + }) + }) + + ginkgo.Context("Inject a pull image timeout exception into the CriProxy", func() { + ginkgo.BeforeEach(func() { + if err := resetCRIProxyInjector(); err != nil { + ginkgo.Skip("Skip the test since the CRI Proxy is undefined.") + } + }) + + ginkgo.AfterEach(func() { + err := resetCRIProxyInjector() + framework.ExpectNoError(err) + }) + + ginkgo.It("Image pull time exceeded 10 seconds", func(ctx context.Context) { + const delayTime = 10 * time.Second + err := addCRIProxyInjector(func(apiName string) error { + if apiName == criproxy.PullImage { + time.Sleep(10 * time.Second) + } + return nil + }) + framework.ExpectNoError(err) + + pod := e2epod.NewPodClient(f).Create(ctx, newPullImageAlwaysPod()) + podErr := e2epod.WaitForPodRunningInNamespace(ctx, f.ClientSet, pod) + framework.ExpectNoError(podErr) + + imagePullDuration, err := getPodImagePullDuration(ctx, f, pod.Name) + framework.ExpectNoError(err) + + gomega.Expect(imagePullDuration).To(gomega.BeNumerically(">=", delayTime), "PullImages should take more than 10 seconds") + }) + }) +}) + +func getFailedToPullImageMsg(ctx context.Context, f *framework.Framework, podName string) (string, error) { + events, err := f.ClientSet.CoreV1().Events(f.Namespace.Name).List(ctx, metav1.ListOptions{}) + if err != nil { + return "", err + } + + for _, event := range events.Items { + if event.Reason == kubeletevents.FailedToPullImage && event.InvolvedObject.Name == podName { + return event.Message, nil + } + } + + return "", fmt.Errorf("failed to find FailedToPullImage event for pod: %s", podName) +} + +func getPodImagePullDuration(ctx context.Context, f *framework.Framework, podName string) (time.Duration, error) { + events, err := f.ClientSet.CoreV1().Events(f.Namespace.Name).List(ctx, metav1.ListOptions{}) + if err != nil { + return 0, err + } + + var startTime, endTime time.Time + for _, event := range events.Items { + if event.InvolvedObject.Name == podName { + switch event.Reason { + case kubeletevents.PullingImage: + startTime = event.FirstTimestamp.Time + case kubeletevents.PulledImage: + endTime = event.FirstTimestamp.Time + } + } + } + + if startTime.IsZero() || endTime.IsZero() { + return 0, fmt.Errorf("failed to find both PullingImage and PulledImage events for pod: %s", podName) + } + + return endTime.Sub(startTime), nil +} + +func newPullImageAlwaysPod() *v1.Pod { + podName := "cri-proxy-test-" + string(uuid.NewUUID()) + pod := &v1.Pod{ + ObjectMeta: metav1.ObjectMeta{ + Name: podName, + }, + Spec: v1.PodSpec{ + Containers: []v1.Container{ + { + Image: imageutils.GetPauseImageName(), + Name: podName, + ImagePullPolicy: v1.PullAlways, + }, + }, + }, + } + return pod +} diff --git a/test/e2e_node/e2e_node_suite_test.go b/test/e2e_node/e2e_node_suite_test.go index 7b75394d3b7..8f289fda700 100644 --- a/test/e2e_node/e2e_node_suite_test.go +++ b/test/e2e_node/e2e_node_suite_test.go @@ -48,6 +48,7 @@ import ( e2eskipper "k8s.io/kubernetes/test/e2e/framework/skipper" e2etestfiles "k8s.io/kubernetes/test/e2e/framework/testfiles" e2etestingmanifests "k8s.io/kubernetes/test/e2e/testing-manifests" + "k8s.io/kubernetes/test/e2e_node/criproxy" "k8s.io/kubernetes/test/e2e_node/services" e2enodetestingmanifests "k8s.io/kubernetes/test/e2e_node/testing-manifests" system "k8s.io/system-validators/validators" @@ -69,7 +70,8 @@ import ( ) var ( - e2es *services.E2EServices + e2eCriProxy *criproxy.RemoteRuntime + e2es *services.E2EServices // featureGates is a map of feature names to bools that enable or disable alpha/experimental features. featureGates map[string]bool // serviceFeatureGates is a map of feature names to bools that enable or @@ -109,6 +111,7 @@ func registerNodeFlags(flags *flag.FlagSet) { flags.Var(cliflag.NewMapStringBool(&featureGates), "feature-gates", "A set of key=value pairs that describe feature gates for alpha/experimental features.") flags.Var(cliflag.NewMapStringBool(&serviceFeatureGates), "service-feature-gates", "A set of key=value pairs that describe feature gates for alpha/experimental features for API service.") flags.BoolVar(&framework.TestContext.StandaloneMode, "standalone-mode", false, "If true, starts kubelet in standalone mode.") + flags.BoolVar(&framework.TestContext.CriProxyEnabled, "cri-proxy-enabled", false, "If true, enable CRI API proxy for failure injection.") } func init() { @@ -242,6 +245,22 @@ var _ = ginkgo.SynchronizedBeforeSuite(func(ctx context.Context) []byte { // We should mask locksmithd when provisioning the machine. maskLocksmithdOnCoreos() + if framework.TestContext.CriProxyEnabled { + framework.Logf("Start cri proxy") + rs, is, err := getCRIClient() + framework.ExpectNoError(err) + + e2eCriProxy = criproxy.NewRemoteRuntimeProxy(rs, is) + endpoint, err := criproxy.GenerateEndpoint() + framework.ExpectNoError(err) + + err = e2eCriProxy.Start(endpoint) + framework.ExpectNoError(err) + + framework.TestContext.ContainerRuntimeEndpoint = endpoint + framework.TestContext.ImageServiceEndpoint = endpoint + } + if *startServices { // If the services are expected to stop after test, they should monitor the test process. // If the services are expected to keep running after test, they should not monitor the test process. @@ -285,6 +304,11 @@ var _ = ginkgo.SynchronizedAfterSuite(func() {}, func() { } } + if e2eCriProxy != nil { + framework.Logf("Stopping cri proxy service...") + e2eCriProxy.Stop() + } + klog.Infof("Tests Finished") }) diff --git a/test/e2e_node/utils_linux.go b/test/e2e_node/utils_linux.go index 8675f766fe9..d9b174e5dae 100644 --- a/test/e2e_node/utils_linux.go +++ b/test/e2e_node/utils_linux.go @@ -20,6 +20,8 @@ limitations under the License. package e2enode import ( + "fmt" + libcontainercgroups "github.com/opencontainers/runc/libcontainer/cgroups" ) @@ -27,3 +29,21 @@ import ( func IsCgroup2UnifiedMode() bool { return libcontainercgroups.IsCgroup2UnifiedMode() } + +// addCRIProxyInjector registers an injector function for the CRIProxy. +func addCRIProxyInjector(injector func(apiName string) error) error { + if e2eCriProxy != nil { + e2eCriProxy.AddInjector(injector) + return nil + } + return fmt.Errorf("failed to add injector because the CRI Proxy is undefined") +} + +// resetCRIProxyInjector resets all injector functions for the CRIProxy. +func resetCRIProxyInjector() error { + if e2eCriProxy != nil { + e2eCriProxy.ResetInjectors() + return nil + } + return fmt.Errorf("failed to reset injector because the CRI Proxy is undefined") +}