Merge pull request #41434 from freehan/cri-kubenet-error

Automatic merge from submit-queue (batch tested with PRs 41531, 40417, 41434)

[CRI] beef up network teardown in  StopPodSandbox

1. Added CheckpointNotFound error to allow dockershim to conduct error handling
2. Retry network teardown if failed

ref: https://github.com/kubernetes/kubernetes/issues/41225
This commit is contained in:
Kubernetes Submit Queue 2017-02-15 23:01:09 -08:00 committed by GitHub
commit 11bf535e03
10 changed files with 112 additions and 24 deletions

View File

@ -34,6 +34,7 @@ go_library(
"//pkg/kubelet/cm:go_default_library", "//pkg/kubelet/cm:go_default_library",
"//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/dockershim/cm:go_default_library", "//pkg/kubelet/dockershim/cm:go_default_library",
"//pkg/kubelet/dockershim/errors:go_default_library",
"//pkg/kubelet/dockertools:go_default_library", "//pkg/kubelet/dockertools:go_default_library",
"//pkg/kubelet/dockertools/securitycontext:go_default_library", "//pkg/kubelet/dockertools/securitycontext:go_default_library",
"//pkg/kubelet/leaky:go_default_library", "//pkg/kubelet/leaky:go_default_library",
@ -83,6 +84,7 @@ go_test(
"//pkg/kubelet/api/v1alpha1/runtime:go_default_library", "//pkg/kubelet/api/v1alpha1/runtime:go_default_library",
"//pkg/kubelet/container:go_default_library", "//pkg/kubelet/container:go_default_library",
"//pkg/kubelet/container/testing:go_default_library", "//pkg/kubelet/container/testing:go_default_library",
"//pkg/kubelet/dockershim/errors:go_default_library",
"//pkg/kubelet/dockershim/testing:go_default_library", "//pkg/kubelet/dockershim/testing:go_default_library",
"//pkg/kubelet/dockertools:go_default_library", "//pkg/kubelet/dockertools:go_default_library",
"//pkg/kubelet/dockertools/securitycontext:go_default_library", "//pkg/kubelet/dockertools/securitycontext:go_default_library",
@ -113,6 +115,7 @@ filegroup(
srcs = [ srcs = [
":package-srcs", ":package-srcs",
"//pkg/kubelet/dockershim/cm:all-srcs", "//pkg/kubelet/dockershim/cm:all-srcs",
"//pkg/kubelet/dockershim/errors:all-srcs",
"//pkg/kubelet/dockershim/remote:all-srcs", "//pkg/kubelet/dockershim/remote:all-srcs",
"//pkg/kubelet/dockershim/testing:all-srcs", "//pkg/kubelet/dockershim/testing:all-srcs",
], ],

View File

@ -23,6 +23,8 @@ import (
"path/filepath" "path/filepath"
"regexp" "regexp"
"strings" "strings"
"k8s.io/kubernetes/pkg/kubelet/dockershim/errors"
) )
const ( const (
@ -40,6 +42,7 @@ type CheckpointStore interface {
// Write persists a checkpoint with key // Write persists a checkpoint with key
Write(key string, data []byte) error Write(key string, data []byte) error
// Read retrieves a checkpoint with key // Read retrieves a checkpoint with key
// Read must return CheckpointNotFoundError if checkpoint is not found
Read(key string) ([]byte, error) Read(key string) ([]byte, error)
// Delete deletes a checkpoint with key // Delete deletes a checkpoint with key
// Delete must not return error if checkpoint does not exist // Delete must not return error if checkpoint does not exist
@ -75,7 +78,11 @@ func (fstore *FileStore) Read(key string) ([]byte, error) {
if err := validateKey(key); err != nil { if err := validateKey(key); err != nil {
return nil, err return nil, err
} }
return ioutil.ReadFile(fstore.getCheckpointPath(key)) bytes, err := ioutil.ReadFile(fstore.getCheckpointPath(key))
if os.IsNotExist(err) {
return bytes, errors.CheckpointNotFoundError
}
return bytes, err
} }
func (fstore *FileStore) Delete(key string) error { func (fstore *FileStore) Delete(key string) error {

View File

@ -23,6 +23,7 @@ import (
"testing" "testing"
"github.com/stretchr/testify/assert" "github.com/stretchr/testify/assert"
"k8s.io/kubernetes/pkg/kubelet/dockershim/errors"
) )
func TestFileStore(t *testing.T) { func TestFileStore(t *testing.T) {
@ -102,7 +103,7 @@ func TestFileStore(t *testing.T) {
err = store.Delete(c.key) err = store.Delete(c.key)
assert.NoError(t, err) assert.NoError(t, err)
_, err = store.Read(c.key) _, err = store.Read(c.key)
assert.Error(t, err) assert.EqualValues(t, errors.CheckpointNotFoundError, err)
} }
// Test delete non existed checkpoint // Test delete non existed checkpoint

View File

@ -23,6 +23,7 @@ import (
"path/filepath" "path/filepath"
"github.com/golang/glog" "github.com/golang/glog"
"k8s.io/kubernetes/pkg/kubelet/dockershim/errors"
hashutil "k8s.io/kubernetes/pkg/util/hash" hashutil "k8s.io/kubernetes/pkg/util/hash"
) )
@ -34,8 +35,6 @@ const (
schemaVersion = "v1" schemaVersion = "v1"
) )
var CorruptCheckpointError = fmt.Errorf("checkpoint is corrupted.")
type Protocol string type Protocol string
// PortMapping is the port mapping configurations of a sandbox. // PortMapping is the port mapping configurations of a sandbox.
@ -108,11 +107,11 @@ func (handler *PersistentCheckpointHandler) GetCheckpoint(podSandboxID string) (
err = json.Unmarshal(blob, &checkpoint) err = json.Unmarshal(blob, &checkpoint)
if err != nil { if err != nil {
glog.Errorf("Failed to unmarshal checkpoint %q. Checkpoint content: %q. ErrMsg: %v", podSandboxID, string(blob), err) glog.Errorf("Failed to unmarshal checkpoint %q. Checkpoint content: %q. ErrMsg: %v", podSandboxID, string(blob), err)
return &checkpoint, CorruptCheckpointError return &checkpoint, errors.CorruptCheckpointError
} }
if checkpoint.CheckSum != calculateChecksum(checkpoint) { if checkpoint.CheckSum != calculateChecksum(checkpoint) {
glog.Errorf("Checksum of checkpoint %q is not valid", podSandboxID) glog.Errorf("Checksum of checkpoint %q is not valid", podSandboxID)
return &checkpoint, CorruptCheckpointError return &checkpoint, errors.CorruptCheckpointError
} }
return &checkpoint, nil return &checkpoint, nil
} }

View File

@ -27,6 +27,7 @@ import (
utilerrors "k8s.io/apimachinery/pkg/util/errors" utilerrors "k8s.io/apimachinery/pkg/util/errors"
runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime" runtimeapi "k8s.io/kubernetes/pkg/kubelet/api/v1alpha1/runtime"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockershim/errors"
"k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/qos" "k8s.io/kubernetes/pkg/kubelet/qos"
"k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/kubelet/types"
@ -114,8 +115,10 @@ func (ds *dockerService) RunPodSandbox(config *runtimeapi.PodSandboxConfig) (str
// after us? // after us?
func (ds *dockerService) StopPodSandbox(podSandboxID string) error { func (ds *dockerService) StopPodSandbox(podSandboxID string) error {
var namespace, name string var namespace, name string
var checkpointErr, statusErr error
needNetworkTearDown := false needNetworkTearDown := false
// Try to retrieve sandbox information from docker daemon or sandbox checkpoint
status, statusErr := ds.PodSandboxStatus(podSandboxID) status, statusErr := ds.PodSandboxStatus(podSandboxID)
if statusErr == nil { if statusErr == nil {
nsOpts := status.GetLinux().GetNamespaces().GetOptions() nsOpts := status.GetLinux().GetNamespaces().GetOptions()
@ -124,36 +127,53 @@ func (ds *dockerService) StopPodSandbox(podSandboxID string) error {
namespace = m.Namespace namespace = m.Namespace
name = m.Name name = m.Name
} else { } else {
checkpoint, err := ds.checkpointHandler.GetCheckpoint(podSandboxID) var checkpoint *PodSandboxCheckpoint
if err != nil { checkpoint, checkpointErr = ds.checkpointHandler.GetCheckpoint(podSandboxID)
glog.Errorf("Failed to get checkpoint for sandbox %q: %v", podSandboxID, err)
return fmt.Errorf("failed to get sandbox status: %v", statusErr) // Proceed if both sandbox container and checkpoint could not be found. This means that following
// actions will only have sandbox ID and not have pod namespace and name information.
// Return error if encounter any unexpected error.
if checkpointErr != nil {
if dockertools.IsContainerNotFoundError(statusErr) && checkpointErr == errors.CheckpointNotFoundError {
glog.Warningf("Both sandbox container and checkpoint for id %q could not be found. "+
"Proceed without further sandbox information.", podSandboxID)
} else {
return utilerrors.NewAggregate([]error{
fmt.Errorf("failed to get checkpoint for sandbox %q: %v", podSandboxID, checkpointErr),
fmt.Errorf("failed to get sandbox status: %v", statusErr)})
} }
} else {
namespace = checkpoint.Namespace namespace = checkpoint.Namespace
name = checkpoint.Name name = checkpoint.Name
}
// Always trigger network plugin to tear down // Always trigger network plugin to tear down
needNetworkTearDown = true needNetworkTearDown = true
} }
// WARNING: The following operations made the following assumption:
// 1. kubelet will retry on any error returned by StopPodSandbox.
// 2. tearing down network and stopping sandbox container can succeed in any sequence.
// This depends on the implementation detail of network plugin and proper error handling.
// For kubenet, if tearing down network failed and sandbox container is stopped, kubelet
// will retry. On retry, kubenet will not be able to retrieve network namespace of the sandbox
// since it is stopped. With empty network namespcae, CNI bridge plugin will conduct best
// effort clean up and will not return error.
errList := []error{}
if needNetworkTearDown { if needNetworkTearDown {
cID := kubecontainer.BuildContainerID(runtimeName, podSandboxID) cID := kubecontainer.BuildContainerID(runtimeName, podSandboxID)
if err := ds.networkPlugin.TearDownPod(namespace, name, cID); err != nil { if err := ds.networkPlugin.TearDownPod(namespace, name, cID); err != nil {
// TODO: Figure out a way to retry this error. We can't errList = append(errList, fmt.Errorf("failed to teardown sandbox %q for pod %s/%s: %v", podSandboxID, namespace, name, err))
// right now because the plugin throws errors when it doesn't find
// eth0, which might not exist for various reasons (setup failed,
// conf changed etc). In theory, it should teardown everything else
// so there's no need to retry.
glog.Errorf("Failed to teardown sandbox %q for pod %s/%s: %v", podSandboxID, namespace, name, err)
} }
} }
if err := ds.client.StopContainer(podSandboxID, defaultSandboxGracePeriod); err != nil { if err := ds.client.StopContainer(podSandboxID, defaultSandboxGracePeriod); err != nil {
glog.Errorf("Failed to stop sandbox %q: %v", podSandboxID, err) glog.Errorf("Failed to stop sandbox %q: %v", podSandboxID, err)
// Do not return error if the container does not exist // Do not return error if the container does not exist
if !dockertools.IsContainerNotFoundError(err) { if !dockertools.IsContainerNotFoundError(err) {
return err errList = append(errList, err)
} }
} }
return nil return utilerrors.NewAggregate(errList)
// TODO: Stop all running containers in the sandbox. // TODO: Stop all running containers in the sandbox.
} }
@ -357,7 +377,7 @@ func (ds *dockerService) ListPodSandbox(filter *runtimeapi.PodSandboxFilter) ([]
if err != nil { if err != nil {
glog.Errorf("Failed to retrieve checkpoint for sandbox %q: %v", id, err) glog.Errorf("Failed to retrieve checkpoint for sandbox %q: %v", id, err)
if err == CorruptCheckpointError { if err == errors.CorruptCheckpointError {
glog.V(2).Info("Removing corrupted checkpoint %q: %+v", id, *checkpoint) glog.V(2).Info("Removing corrupted checkpoint %q: %+v", id, *checkpoint)
ds.checkpointHandler.RemoveCheckpoint(id) ds.checkpointHandler.RemoveCheckpoint(id)
} }

View File

@ -32,6 +32,7 @@ import (
kubecm "k8s.io/kubernetes/pkg/kubelet/cm" kubecm "k8s.io/kubernetes/pkg/kubelet/cm"
kubecontainer "k8s.io/kubernetes/pkg/kubelet/container" kubecontainer "k8s.io/kubernetes/pkg/kubelet/container"
"k8s.io/kubernetes/pkg/kubelet/dockershim/cm" "k8s.io/kubernetes/pkg/kubelet/dockershim/cm"
"k8s.io/kubernetes/pkg/kubelet/dockershim/errors"
"k8s.io/kubernetes/pkg/kubelet/dockertools" "k8s.io/kubernetes/pkg/kubelet/dockertools"
"k8s.io/kubernetes/pkg/kubelet/network" "k8s.io/kubernetes/pkg/kubelet/network"
"k8s.io/kubernetes/pkg/kubelet/network/cni" "k8s.io/kubernetes/pkg/kubelet/network/cni"
@ -292,9 +293,15 @@ func (ds *dockerService) GetNetNS(podSandboxID string) (string, error) {
func (ds *dockerService) GetPodPortMappings(podSandboxID string) ([]*hostport.PortMapping, error) { func (ds *dockerService) GetPodPortMappings(podSandboxID string) ([]*hostport.PortMapping, error) {
// TODO: get portmappings from docker labels for backward compatibility // TODO: get portmappings from docker labels for backward compatibility
checkpoint, err := ds.checkpointHandler.GetCheckpoint(podSandboxID) checkpoint, err := ds.checkpointHandler.GetCheckpoint(podSandboxID)
// Return empty portMappings if checkpoint is not found
if err != nil { if err != nil {
if err == errors.CheckpointNotFoundError {
glog.Warningf("Failed to retrieve checkpoint for sandbox %q: %v", err)
return nil, nil
} else {
return nil, err return nil, err
} }
}
portMappings := []*hostport.PortMapping{} portMappings := []*hostport.PortMapping{}
for _, pm := range checkpoint.Data.PortMappings { for _, pm := range checkpoint.Data.PortMappings {

View File

@ -0,0 +1,27 @@
package(default_visibility = ["//visibility:public"])
licenses(["notice"])
load(
"@io_bazel_rules_go//go:def.bzl",
"go_library",
)
go_library(
name = "go_default_library",
srcs = ["errors.go"],
tags = ["automanaged"],
)
filegroup(
name = "package-srcs",
srcs = glob(["**"]),
tags = ["automanaged"],
visibility = ["//visibility:private"],
)
filegroup(
name = "all-srcs",
srcs = [":package-srcs"],
tags = ["automanaged"],
)

View File

@ -0,0 +1,22 @@
/*
Copyright 2017 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package errors
import "fmt"
var CorruptCheckpointError = fmt.Errorf("checkpoint is corrupted.")
var CheckpointNotFoundError = fmt.Errorf("checkpoint is not found.")

View File

@ -11,6 +11,7 @@ go_library(
name = "go_default_library", name = "go_default_library",
srcs = ["util.go"], srcs = ["util.go"],
tags = ["automanaged"], tags = ["automanaged"],
deps = ["//pkg/kubelet/dockershim/errors:go_default_library"],
) )
filegroup( filegroup(

View File

@ -17,8 +17,9 @@ limitations under the License.
package testing package testing
import ( import (
"fmt"
"sync" "sync"
"k8s.io/kubernetes/pkg/kubelet/dockershim/errors"
) )
// MemStore is an implementation of CheckpointStore interface which stores checkpoint in memory. // MemStore is an implementation of CheckpointStore interface which stores checkpoint in memory.
@ -43,7 +44,7 @@ func (mstore *MemStore) Read(key string) ([]byte, error) {
defer mstore.Unlock() defer mstore.Unlock()
data, ok := mstore.mem[key] data, ok := mstore.mem[key]
if !ok { if !ok {
return nil, fmt.Errorf("checkpoint %q could not be found", key) return nil, errors.CheckpointNotFoundError
} }
return data, nil return data, nil
} }