Merge pull request #93773 from knight42/fix/kubelet-concurrent-map-writes

fix(kubelet): protect `containerCleanupInfos` from concurrent map writes
This commit is contained in:
Kubernetes Prow Robot 2020-09-01 18:43:27 -07:00 committed by GitHub
commit db10d8c942
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 86 additions and 4 deletions

View File

@ -83,6 +83,25 @@ func (ds *dockerService) ListContainers(_ context.Context, r *runtimeapi.ListCon
return &runtimeapi.ListContainersResponse{Containers: result}, nil
}
func (ds *dockerService) getContainerCleanupInfo(containerID string) (*containerCleanupInfo, bool) {
ds.cleanupInfosLock.RLock()
defer ds.cleanupInfosLock.RUnlock()
info, ok := ds.containerCleanupInfos[containerID]
return info, ok
}
func (ds *dockerService) setContainerCleanupInfo(containerID string, info *containerCleanupInfo) {
ds.cleanupInfosLock.Lock()
defer ds.cleanupInfosLock.Unlock()
ds.containerCleanupInfos[containerID] = info
}
func (ds *dockerService) clearContainerCleanupInfo(containerID string) {
ds.cleanupInfosLock.Lock()
defer ds.cleanupInfosLock.Unlock()
delete(ds.containerCleanupInfos, containerID)
}
// CreateContainer creates a new container in the given PodSandbox
// Docker cannot store the log to an arbitrary location (yet), so we create an
// symlink at LogPath, linking to the actual path of the log.
@ -185,7 +204,7 @@ func (ds *dockerService) CreateContainer(_ context.Context, r *runtimeapi.Create
// we don't perform the clean up just yet at that could destroy information
// needed for the container to start (e.g. Windows credentials stored in
// registry keys); instead, we'll clean up when the container gets removed
ds.containerCleanupInfos[containerID] = cleanupInfo
ds.setContainerCleanupInfo(containerID, cleanupInfo)
}
return &runtimeapi.CreateContainerResponse{ContainerId: containerID}, nil
}
@ -461,11 +480,11 @@ func (ds *dockerService) UpdateContainerResources(_ context.Context, r *runtimea
}
func (ds *dockerService) performPlatformSpecificContainerForContainer(containerID string) (errors []error) {
if cleanupInfo, present := ds.containerCleanupInfos[containerID]; present {
if cleanupInfo, present := ds.getContainerCleanupInfo(containerID); present {
errors = ds.performPlatformSpecificContainerCleanupAndLogErrors(containerID, cleanupInfo)
if len(errors) == 0 {
delete(ds.containerCleanupInfos, containerID)
ds.clearContainerCleanupInfo(containerID)
}
}

View File

@ -23,6 +23,7 @@ import (
"fmt"
"path/filepath"
"strings"
"sync"
"testing"
"time"
@ -56,6 +57,67 @@ func getTestCTX() context.Context {
return context.Background()
}
// TestConcurrentlyCreateAndDeleteContainers is a regression test for #93771, which ensures
// kubelet would not panic on concurrent writes to `dockerService.containerCleanupInfos`.
func TestConcurrentlyCreateAndDeleteContainers(t *testing.T) {
ds, _, _ := newTestDockerService()
podName, namespace := "foo", "bar"
containerName, image := "sidecar", "logger"
const count = 20
configs := make([]*runtimeapi.ContainerConfig, 0, count)
sConfigs := make([]*runtimeapi.PodSandboxConfig, 0, count)
for i := 0; i < count; i++ {
s := makeSandboxConfig(fmt.Sprintf("%s%d", podName, i),
fmt.Sprintf("%s%d", namespace, i), fmt.Sprintf("%d", i), 0)
labels := map[string]string{"concurrent-test": fmt.Sprintf("label%d", i)}
c := makeContainerConfig(s, fmt.Sprintf("%s%d", containerName, i),
fmt.Sprintf("%s:v%d", image, i), uint32(i), labels, nil)
sConfigs = append(sConfigs, s)
configs = append(configs, c)
}
containerIDs := make(chan string, len(configs)) // make channel non-blocking to simulate concurrent containers creation
var (
creationWg sync.WaitGroup
deletionWg sync.WaitGroup
)
creationWg.Add(len(configs))
go func() {
creationWg.Wait()
close(containerIDs)
}()
for i := range configs {
go func(i int) {
defer creationWg.Done()
// We don't care about the sandbox id; pass a bogus one.
sandboxID := fmt.Sprintf("sandboxid%d", i)
req := &runtimeapi.CreateContainerRequest{PodSandboxId: sandboxID, Config: configs[i], SandboxConfig: sConfigs[i]}
createResp, err := ds.CreateContainer(getTestCTX(), req)
if err != nil {
t.Errorf("CreateContainer: %v", err)
return
}
containerIDs <- createResp.ContainerId
}(i)
}
for containerID := range containerIDs {
deletionWg.Add(1)
go func(id string) {
defer deletionWg.Done()
_, err := ds.RemoveContainer(getTestCTX(), &runtimeapi.RemoveContainerRequest{ContainerId: id})
if err != nil {
t.Errorf("RemoveContainer: %v", err)
}
}(containerID)
}
deletionWg.Wait()
}
// TestListContainers creates several containers and then list them to check
// whether the correct metadatas, states, and labels are returned.
func TestListContainers(t *testing.T) {

View File

@ -31,7 +31,7 @@ import (
dockertypes "github.com/docker/docker/api/types"
"k8s.io/klog/v2"
"k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
runtimeapi "k8s.io/cri-api/pkg/apis/runtime/v1alpha2"
kubeletconfig "k8s.io/kubernetes/pkg/kubelet/apis/config"
"k8s.io/kubernetes/pkg/kubelet/checkpointmanager"
@ -316,6 +316,7 @@ type dockerService struct {
// (see `applyPlatformSpecificDockerConfig` and `performPlatformSpecificContainerCleanup`
// methods for more info).
containerCleanupInfos map[string]*containerCleanupInfo
cleanupInfosLock sync.RWMutex
}
// TODO: handle context.