diff --git a/pkg/kubelet/dockershim/docker_container.go b/pkg/kubelet/dockershim/docker_container.go index 01b865f9d2a..3c61e13cf91 100644 --- a/pkg/kubelet/dockershim/docker_container.go +++ b/pkg/kubelet/dockershim/docker_container.go @@ -83,6 +83,25 @@ func (ds *dockerService) ListContainers(_ context.Context, r *runtimeapi.ListCon return &runtimeapi.ListContainersResponse{Containers: result}, nil } +func (ds *dockerService) getContainerCleanupInfo(containerID string) (*containerCleanupInfo, bool) { + ds.cleanupInfosLock.RLock() + defer ds.cleanupInfosLock.RUnlock() + info, ok := ds.containerCleanupInfos[containerID] + return info, ok +} + +func (ds *dockerService) setContainerCleanupInfo(containerID string, info *containerCleanupInfo) { + ds.cleanupInfosLock.Lock() + defer ds.cleanupInfosLock.Unlock() + ds.containerCleanupInfos[containerID] = info +} + +func (ds *dockerService) clearContainerCleanupInfo(containerID string) { + ds.cleanupInfosLock.Lock() + defer ds.cleanupInfosLock.Unlock() + delete(ds.containerCleanupInfos, containerID) +} + // CreateContainer creates a new container in the given PodSandbox // Docker cannot store the log to an arbitrary location (yet), so we create an // symlink at LogPath, linking to the actual path of the log. @@ -185,7 +204,7 @@ func (ds *dockerService) CreateContainer(_ context.Context, r *runtimeapi.Create // we don't perform the clean up just yet at that could destroy information // needed for the container to start (e.g. Windows credentials stored in // registry keys); instead, we'll clean up when the container gets removed - ds.containerCleanupInfos[containerID] = cleanupInfo + ds.setContainerCleanupInfo(containerID, cleanupInfo) } return &runtimeapi.CreateContainerResponse{ContainerId: containerID}, nil } @@ -461,11 +480,11 @@ func (ds *dockerService) UpdateContainerResources(_ context.Context, r *runtimea } func (ds *dockerService) performPlatformSpecificContainerForContainer(containerID string) (errors []error) { - if cleanupInfo, present := ds.containerCleanupInfos[containerID]; present { + if cleanupInfo, present := ds.getContainerCleanupInfo(containerID); present { errors = ds.performPlatformSpecificContainerCleanupAndLogErrors(containerID, cleanupInfo) if len(errors) == 0 { - delete(ds.containerCleanupInfos, containerID) + ds.clearContainerCleanupInfo(containerID) } } diff --git a/pkg/kubelet/dockershim/docker_container_test.go b/pkg/kubelet/dockershim/docker_container_test.go index 61b30a6c996..9ff4195929c 100644 --- a/pkg/kubelet/dockershim/docker_container_test.go +++ b/pkg/kubelet/dockershim/docker_container_test.go @@ -23,6 +23,7 @@ import ( "fmt" "path/filepath" "strings" + "sync" "testing" "time" @@ -56,6 +57,67 @@ func getTestCTX() context.Context { return context.Background() } +// TestConcurrentlyCreateAndDeleteContainers is a regression test for #93771, which ensures +// kubelet would not panic on concurrent writes to `dockerService.containerCleanupInfos`. +func TestConcurrentlyCreateAndDeleteContainers(t *testing.T) { + ds, _, _ := newTestDockerService() + podName, namespace := "foo", "bar" + containerName, image := "sidecar", "logger" + + const count = 20 + configs := make([]*runtimeapi.ContainerConfig, 0, count) + sConfigs := make([]*runtimeapi.PodSandboxConfig, 0, count) + for i := 0; i < count; i++ { + s := makeSandboxConfig(fmt.Sprintf("%s%d", podName, i), + fmt.Sprintf("%s%d", namespace, i), fmt.Sprintf("%d", i), 0) + labels := map[string]string{"concurrent-test": fmt.Sprintf("label%d", i)} + c := makeContainerConfig(s, fmt.Sprintf("%s%d", containerName, i), + fmt.Sprintf("%s:v%d", image, i), uint32(i), labels, nil) + sConfigs = append(sConfigs, s) + configs = append(configs, c) + } + + containerIDs := make(chan string, len(configs)) // make channel non-blocking to simulate concurrent containers creation + + var ( + creationWg sync.WaitGroup + deletionWg sync.WaitGroup + ) + + creationWg.Add(len(configs)) + + go func() { + creationWg.Wait() + close(containerIDs) + }() + for i := range configs { + go func(i int) { + defer creationWg.Done() + // We don't care about the sandbox id; pass a bogus one. + sandboxID := fmt.Sprintf("sandboxid%d", i) + req := &runtimeapi.CreateContainerRequest{PodSandboxId: sandboxID, Config: configs[i], SandboxConfig: sConfigs[i]} + createResp, err := ds.CreateContainer(getTestCTX(), req) + if err != nil { + t.Errorf("CreateContainer: %v", err) + return + } + containerIDs <- createResp.ContainerId + }(i) + } + + for containerID := range containerIDs { + deletionWg.Add(1) + go func(id string) { + defer deletionWg.Done() + _, err := ds.RemoveContainer(getTestCTX(), &runtimeapi.RemoveContainerRequest{ContainerId: id}) + if err != nil { + t.Errorf("RemoveContainer: %v", err) + } + }(containerID) + } + deletionWg.Wait() +} + // TestListContainers creates several containers and then list them to check // whether the correct metadatas, states, and labels are returned. func TestListContainers(t *testing.T) { diff --git a/pkg/kubelet/dockershim/docker_service.go b/pkg/kubelet/dockershim/docker_service.go index d9665698cc3..74a7b5d0812 100644 --- a/pkg/kubelet/dockershim/docker_service.go +++ b/pkg/kubelet/dockershim/docker_service.go @@ -31,7 +31,7 @@ import ( dockertypes "github.com/docker/docker/api/types" "k8s.io/klog/v2" - "k8s.io/api/core/v1" + v1 "k8s.io/api/core/v1" runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2" kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config" "k8s.io/kubernetes/pkg/kubelet/checkpointmanager" @@ -316,6 +316,7 @@ type dockerService struct { // (see `applyPlatformSpecificDockerConfig` and `performPlatformSpecificContainerCleanup` // methods for more info). containerCleanupInfos map[string]*containerCleanupInfo + cleanupInfosLock sync.RWMutex } // TODO: handle context.