Merge pull request #18172 from Random-Liu/generate-sync-event

Auto commit by PR queue bot
This commit is contained in:
k8s-merge-robot 2016-01-20 22:48:35 -08:00
commit d0f59e2126
4 changed files with 295 additions and 77 deletions

View File

@ -17,7 +17,6 @@ limitations under the License.
package container
import (
"errors"
"fmt"
"io"
"reflect"
@ -31,32 +30,6 @@ import (
"k8s.io/kubernetes/pkg/volume"
)
// Container Terminated and Kubelet is backing off the restart
var ErrCrashLoopBackOff = errors.New("CrashLoopBackOff")
var (
// Container image pull failed, kubelet is backing off image pull
ErrImagePullBackOff = errors.New("ImagePullBackOff")
// Unable to inspect image
ErrImageInspect = errors.New("ImageInspectError")
// General image pull error
ErrImagePull = errors.New("ErrImagePull")
// Required Image is absent on host and PullPolicy is NeverPullImage
ErrImageNeverPull = errors.New("ErrImageNeverPull")
// ErrContainerNotFound returned when a container in the given pod with the
// given container name was not found, amongst those managed by the kubelet.
ErrContainerNotFound = errors.New("no matching container")
// Get http error when pulling image from registry
RegistryUnavailable = errors.New("RegistryUnavailable")
)
var ErrRunContainer = errors.New("RunContainerError")
type Version interface {
// Compare compares two versions of the runtime. On success it returns -1
// if the version is less than the other, 1 if it is greater than the other,

View File

@ -0,0 +1,135 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"errors"
"fmt"
utilerrors "k8s.io/kubernetes/pkg/util/errors"
)
// TODO(random-liu): We need to better organize runtime errors for introspection.
// Container Terminated and Kubelet is backing off the restart
var ErrCrashLoopBackOff = errors.New("CrashLoopBackOff")
var (
// Container image pull failed, kubelet is backing off image pull
ErrImagePullBackOff = errors.New("ImagePullBackOff")
// Unable to inspect image
ErrImageInspect = errors.New("ImageInspectError")
// General image pull error
ErrImagePull = errors.New("ErrImagePull")
// Required Image is absent on host and PullPolicy is NeverPullImage
ErrImageNeverPull = errors.New("ErrImageNeverPull")
// ErrContainerNotFound returned when a container in the given pod with the
// given container name was not found, amongst those managed by the kubelet.
ErrContainerNotFound = errors.New("no matching container")
// Get http error when pulling image from registry
RegistryUnavailable = errors.New("RegistryUnavailable")
)
var (
ErrRunContainer = errors.New("RunContainerError")
ErrKillContainer = errors.New("KillContainerError")
ErrVerifyNonRoot = errors.New("VerifyNonRootError")
)
var (
ErrSetupNetwork = errors.New("SetupNetworkError")
ErrTeardownNetwork = errors.New("TeardownNetworkError")
)
// SyncAction indicates different kind of actions in SyncPod() and KillPod(). Now there are only actions
// about start/kill container and setup/teardown network.
type SyncAction string
const (
StartContainer SyncAction = "StartContainer"
KillContainer SyncAction = "KillContainer"
SetupNetwork SyncAction = "SetupNetwork"
TeardownNetwork SyncAction = "TeardownNetwork"
)
// SyncResult is the result of sync action.
type SyncResult struct {
// The associated action of the result
Action SyncAction
// The target of the action, now the target can only be:
// * Container: Target should be container name
// * Network: Target is useless now, we just set it as pod full name now
Target interface{}
// Brief error reason
Error error
// Human readable error reason
Message string
}
// NewSyncResult generates new SyncResult with specific Action and Target
func NewSyncResult(action SyncAction, target interface{}) *SyncResult {
return &SyncResult{Action: action, Target: target}
}
// Fail fails the SyncResult with specific error and message
func (r *SyncResult) Fail(err error, msg string) {
r.Error, r.Message = err, msg
}
// PodSyncResult is the summary result of SyncPod() and KillPod()
type PodSyncResult struct {
// Result of different sync actions
SyncResults []*SyncResult
// Error encountered in SyncPod() and KillPod() that is not already included in SyncResults
SyncError error
}
// AddSyncResult adds multiple SyncResult to current PodSyncResult
func (p *PodSyncResult) AddSyncResult(result ...*SyncResult) {
p.SyncResults = append(p.SyncResults, result...)
}
// AddPodSyncResult merges a PodSyncResult to current one
func (p *PodSyncResult) AddPodSyncResult(result PodSyncResult) {
p.AddSyncResult(result.SyncResults...)
p.SyncError = result.SyncError
}
// Fail fails the PodSyncResult with an error occured in SyncPod() and KillPod() itself
func (p *PodSyncResult) Fail(err error) {
p.SyncError = err
}
// Error returns an error summarizing all the errors in PodSyncResult
func (p *PodSyncResult) Error() error {
errlist := []error{}
if p.SyncError != nil {
errlist = append(errlist, fmt.Errorf("failed to SyncPod: %v\n", p.SyncError))
}
for _, result := range p.SyncResults {
if result.Error != nil {
errlist = append(errlist, fmt.Errorf("failed to %q for %q with %v: %q\n", result.Action, result.Target,
result.Error, result.Message))
}
}
return utilerrors.NewAggregate(errlist)
}

View File

@ -0,0 +1,68 @@
/*
Copyright 2015 The Kubernetes Authors All rights reserved.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package container
import (
"errors"
"testing"
)
func TestPodSyncResult(t *testing.T) {
okResults := []*SyncResult{
NewSyncResult(StartContainer, "container_0"),
NewSyncResult(SetupNetwork, "pod"),
}
errResults := []*SyncResult{
NewSyncResult(KillContainer, "container_1"),
NewSyncResult(TeardownNetwork, "pod"),
}
errResults[0].Fail(errors.New("error_0"), "message_0")
errResults[1].Fail(errors.New("error_1"), "message_1")
// If the PodSyncResult doesn't contain error result, it should not be error
result := PodSyncResult{}
result.AddSyncResult(okResults...)
if result.Error() != nil {
t.Errorf("PodSyncResult should not be error: %v", result)
}
// If the PodSyncResult contains error result, it should be error
result = PodSyncResult{}
result.AddSyncResult(okResults...)
result.AddSyncResult(errResults...)
if result.Error() == nil {
t.Errorf("PodSyncResult should be error: %q", result)
}
// If the PodSyncResult is failed, it should be error
result = PodSyncResult{}
result.AddSyncResult(okResults...)
result.Fail(errors.New("error"))
if result.Error() == nil {
t.Errorf("PodSyncResult should be error: %q", result)
}
// If the PodSyncResult is added an error PodSyncResult, it should be error
errResult := PodSyncResult{}
errResult.AddSyncResult(errResults...)
result = PodSyncResult{}
result.AddSyncResult(okResults...)
result.AddPodSyncResult(errResult)
if result.Error() == nil {
t.Errorf("PodSyncResult should be error: %q", result)
}
}

View File

@ -1224,16 +1224,23 @@ func (dm *DockerManager) GetContainerIP(containerID, interfaceName string) (stri
// TODO(random-liu): After using pod status for KillPod(), we can also remove the kubernetesPodLabel, because all the needed information should have
// been extract from new labels and stored in pod status.
func (dm *DockerManager) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) error {
// Send the kills in parallel since they may take a long time. Len + 1 since there
// can be Len errors + the networkPlugin teardown error.
errs := make(chan error, len(runningPod.Containers)+1)
result := dm.killPodWithSyncResult(pod, runningPod)
return result.Error()
}
// TODO(random-liu): This is just a temporary function, will be removed when we acturally add PodSyncResult
// NOTE(random-liu): The pod passed in could be *nil* when kubelet restarted.
func (dm *DockerManager) killPodWithSyncResult(pod *api.Pod, runningPod kubecontainer.Pod) (result kubecontainer.PodSyncResult) {
// Send the kills in parallel since they may take a long time.
// There may be len(runningPod.Containers) or len(runningPod.Containers)-1 of result in the channel
containerResults := make(chan *kubecontainer.SyncResult, len(runningPod.Containers))
wg := sync.WaitGroup{}
var (
networkContainer *kubecontainer.Container
networkSpec *api.Container
)
wg.Add(len(runningPod.Containers))
for _, container := range runningPod.Containers {
wg.Add(1)
go func(container *kubecontainer.Container) {
defer util.HandleCrash()
defer wg.Done()
@ -1258,33 +1265,36 @@ func (dm *DockerManager) KillPod(pod *api.Pod, runningPod kubecontainer.Pod) err
return
}
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, container.Name)
err := dm.KillContainerInPod(container.ID, containerSpec, pod, "Need to kill pod.")
if err != nil {
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, runningPod.ID)
errs <- err
}
containerResults <- killContainerResult
}(container)
}
wg.Wait()
close(containerResults)
for containerResult := range containerResults {
result.AddSyncResult(containerResult)
}
if networkContainer != nil {
teardownNetworkResult := kubecontainer.NewSyncResult(kubecontainer.TeardownNetwork, kubecontainer.BuildPodFullName(runningPod.Name, runningPod.Namespace))
result.AddSyncResult(teardownNetworkResult)
if err := dm.networkPlugin.TearDownPod(runningPod.Namespace, runningPod.Name, kubecontainer.DockerID(networkContainer.ID.ID)); err != nil {
glog.Errorf("Failed tearing down the infra container: %v", err)
errs <- err
message := fmt.Sprintf("Failed to teardown network for pod %q using network plugins %q: %v", runningPod.ID, dm.networkPlugin.Name(), err)
teardownNetworkResult.Fail(kubecontainer.ErrTeardownNetwork, message)
glog.Error(message)
}
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, networkContainer.Name)
result.AddSyncResult(killContainerResult)
if err := dm.KillContainerInPod(networkContainer.ID, networkSpec, pod, "Need to kill pod."); err != nil {
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
glog.Errorf("Failed to delete container: %v; Skipping pod %q", err, runningPod.ID)
errs <- err
}
}
close(errs)
if len(errs) > 0 {
errList := []error{}
for err := range errs {
errList = append(errList, err)
}
return fmt.Errorf("failed to delete containers (%v)", errList)
}
return nil
return
}
// KillContainerInPod kills a container in the pod. It must be passed either a container ID or a container and pod,
@ -1560,7 +1570,8 @@ func appendToFile(filePath, stringToAppend string) error {
}
// createPodInfraContainer starts the pod infra container for a pod. Returns the docker container ID of the newly created container.
func (dm *DockerManager) createPodInfraContainer(pod *api.Pod) (kubecontainer.DockerID, error) {
// If any error occurs in this function, it will return a brief error and a detailed error message.
func (dm *DockerManager) createPodInfraContainer(pod *api.Pod) (kubecontainer.DockerID, error, string) {
start := time.Now()
defer func() {
metrics.ContainerManagerLatency.WithLabelValues("createPodInfraContainer").Observe(metrics.SinceInMicroseconds(start))
@ -1592,17 +1603,17 @@ func (dm *DockerManager) createPodInfraContainer(pod *api.Pod) (kubecontainer.Do
// No pod secrets for the infra container.
// The message isnt needed for the Infra container
if err, _ := dm.imagePuller.PullImage(pod, container, nil); err != nil {
return "", err
if err, msg := dm.imagePuller.PullImage(pod, container, nil); err != nil {
return "", err, msg
}
// Currently we don't care about restart count of infra container, just set it to 0.
id, err := dm.runContainerInPod(pod, container, netNamespace, getIPCMode(pod), getPidMode(pod), 0)
if err != nil {
return "", err
return "", kubecontainer.ErrRunContainer, err.Error()
}
return kubecontainer.DockerID(id.ID), nil
return kubecontainer.DockerID(id.ID), nil, ""
}
// Structure keeping information on changes that need to happen for a pod. The semantics is as follows:
@ -1746,7 +1757,13 @@ func (dm *DockerManager) clearReasonCache(pod *api.Pod, container *api.Container
}
// Sync the running pod to match the specified desired pod.
func (dm *DockerManager) SyncPod(pod *api.Pod, _ api.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error {
func (dm *DockerManager) SyncPod(pod *api.Pod, apiPodStatus api.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) error {
result := dm.syncPodWithSyncResult(pod, apiPodStatus, podStatus, pullSecrets, backOff)
return result.Error()
}
// (random-liu) This is just a temporary function, will be removed when we acturally add PodSyncEvent
func (dm *DockerManager) syncPodWithSyncResult(pod *api.Pod, _ api.PodStatus, podStatus *kubecontainer.PodStatus, pullSecrets []api.Secret, backOff *util.Backoff) (result kubecontainer.PodSyncResult) {
start := time.Now()
defer func() {
metrics.ContainerManagerLatency.WithLabelValues("SyncPod").Observe(metrics.SinceInMicroseconds(start))
@ -1754,7 +1771,8 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, _ api.PodStatus, podStatus *kubec
containerChanges, err := dm.computePodContainerChanges(pod, podStatus)
if err != nil {
return err
result.Fail(err)
return
}
glog.V(3).Infof("Got container changes for pod %q: %+v", format.Pod(pod), containerChanges)
@ -1774,8 +1792,10 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, _ api.PodStatus, podStatus *kubec
// Killing phase: if we want to start new infra container, or nothing is running kill everything (including infra container)
// TODO(random-liu): We'll use pod status directly in the future
if err := dm.KillPod(pod, kubecontainer.ConvertPodStatusToRunningPod(podStatus)); err != nil {
return err
killResult := dm.killPodWithSyncResult(pod, kubecontainer.ConvertPodStatusToRunningPod(podStatus))
result.AddPodSyncResult(killResult)
if killResult.Error() != nil {
return
}
} else {
// Otherwise kill any running containers in this pod which are not specified as ones to keep.
@ -1783,8 +1803,7 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, _ api.PodStatus, podStatus *kubec
for _, containerStatus := range runningContainerStatues {
_, keep := containerChanges.ContainersToKeep[kubecontainer.DockerID(containerStatus.ID.ID)]
if !keep {
// NOTE(random-liu): Just log ID or log container status here?
glog.V(3).Infof("Killing unwanted container %+v", containerStatus)
glog.V(3).Infof("Killing unwanted container %q(id=%q) for pod %q", containerStatus.Name, containerStatus.ID, format.Pod(pod))
// attempt to find the appropriate container policy
var podContainer *api.Container
var killMessage string
@ -1795,9 +1814,12 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, _ api.PodStatus, podStatus *kubec
break
}
}
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, containerStatus.Name)
result.AddSyncResult(killContainerResult)
if err := dm.KillContainerInPod(containerStatus.ID, podContainer, pod, killMessage); err != nil {
glog.Errorf("Error killing container: %v", err)
return err
killContainerResult.Fail(kubecontainer.ErrKillContainer, err.Error())
glog.Errorf("Error killing container %q(id=%q) for pod %q: %v", containerStatus.Name, containerStatus.ID, format.Pod(pod), err)
return
}
}
}
@ -1807,31 +1829,45 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, _ api.PodStatus, podStatus *kubec
podInfraContainerID := containerChanges.InfraContainerId
if containerChanges.StartInfraContainer && (len(containerChanges.ContainersToStart) > 0) {
glog.V(4).Infof("Creating pod infra container for %q", format.Pod(pod))
podInfraContainerID, err = dm.createPodInfraContainer(pod)
startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, PodInfraContainerName)
result.AddSyncResult(startContainerResult)
var msg string
podInfraContainerID, err, msg = dm.createPodInfraContainer(pod)
if err != nil {
startContainerResult.Fail(err, msg)
glog.Errorf("Failed to create pod infra container: %v; Skipping pod %q", err, format.Pod(pod))
return err
return
}
setupNetworkResult := kubecontainer.NewSyncResult(kubecontainer.SetupNetwork, kubecontainer.GetPodFullName(pod))
result.AddSyncResult(setupNetworkResult)
// Call the networking plugin
err = dm.networkPlugin.SetUpPod(pod.Namespace, pod.Name, podInfraContainerID)
if err != nil {
message := fmt.Sprintf("Failed to setup networking for pod %q using network plugins: %v; Skipping pod", format.Pod(pod), err)
// TODO: (random-liu) There shouldn't be "Skipping pod" in sync result message
message := fmt.Sprintf("Failed to setup network for pod %q using network plugins %q: %v; Skipping pod", format.Pod(pod), dm.networkPlugin.Name(), err)
setupNetworkResult.Fail(kubecontainer.ErrSetupNetwork, message)
glog.Error(message)
// Delete infra container
killContainerResult := kubecontainer.NewSyncResult(kubecontainer.KillContainer, PodInfraContainerName)
result.AddSyncResult(killContainerResult)
if delErr := dm.KillContainerInPod(kubecontainer.ContainerID{
ID: string(podInfraContainerID),
Type: "docker"}, nil, pod, message); delErr != nil {
killContainerResult.Fail(kubecontainer.ErrKillContainer, delErr.Error())
glog.Warningf("Clear infra container failed for pod %q: %v", format.Pod(pod), delErr)
}
return err
return
}
// Setup the host interface unless the pod is on the host's network (FIXME: move to networkPlugin when ready)
podInfraContainer, err := dm.client.InspectContainer(string(podInfraContainerID))
var podInfraContainer *docker.Container
podInfraContainer, err = dm.client.InspectContainer(string(podInfraContainerID))
if err != nil {
glog.Errorf("Failed to inspect pod infra container: %v; Skipping pod %q", err, format.Pod(pod))
return err
result.Fail(err)
return
}
if !(pod.Spec.SecurityContext != nil && pod.Spec.SecurityContext.HostNetwork) {
if err = hairpin.SetUpContainer(podInfraContainer.State.Pid, "eth0"); err != nil {
@ -1844,28 +1880,35 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, _ api.PodStatus, podStatus *kubec
pod.Status.PodIP = dm.determineContainerIP(pod.Name, pod.Namespace, podInfraContainer)
}
containersStarted := 0
// Start everything
for idx := range containerChanges.ContainersToStart {
container := &pod.Spec.Containers[idx]
startContainerResult := kubecontainer.NewSyncResult(kubecontainer.StartContainer, container.Name)
result.AddSyncResult(startContainerResult)
// containerChanges.StartInfraContainer causes the containers to be restarted for config reasons
// ignore backoff
if !containerChanges.StartInfraContainer && dm.doBackOff(pod, container, podStatus, backOff) {
if !containerChanges.StartInfraContainer {
isInBackOff, err, msg := dm.doBackOff(pod, container, podStatus, backOff)
if isInBackOff {
startContainerResult.Fail(err, msg)
glog.V(4).Infof("Backing Off restarting container %+v in pod %v", container, format.Pod(pod))
continue
}
}
glog.V(4).Infof("Creating container %+v in pod %v", container, format.Pod(pod))
err, msg := dm.imagePuller.PullImage(pod, container, pullSecrets)
if err != nil {
startContainerResult.Fail(err, msg)
dm.updateReasonCache(pod, container, err.Error(), errors.New(msg))
continue
}
if container.SecurityContext != nil && container.SecurityContext.RunAsNonRoot != nil && *container.SecurityContext.RunAsNonRoot {
err := dm.verifyNonRoot(container)
dm.updateReasonCache(pod, container, "VerifyNonRootError", err)
dm.updateReasonCache(pod, container, kubecontainer.ErrVerifyNonRoot.Error(), err)
if err != nil {
startContainerResult.Fail(kubecontainer.ErrVerifyNonRoot, err.Error())
glog.Errorf("Error running pod %q container %q: %v", format.Pod(pod), container.Name, err)
continue
}
@ -1886,20 +1929,16 @@ func (dm *DockerManager) SyncPod(pod *api.Pod, _ api.PodStatus, podStatus *kubec
_, err = dm.runContainerInPod(pod, container, namespaceMode, namespaceMode, getPidMode(pod), restartCount)
dm.updateReasonCache(pod, container, kubecontainer.ErrRunContainer.Error(), err)
if err != nil {
startContainerResult.Fail(kubecontainer.ErrRunContainer, err.Error())
// TODO(bburns) : Perhaps blacklist a container after N failures?
glog.Errorf("Error running pod %q container %q: %v", format.Pod(pod), container.Name, err)
continue
}
containersStarted++
// Successfully started the container; clear the entry in the failure
// reason cache.
dm.clearReasonCache(pod, container)
}
if containersStarted != len(containerChanges.ContainersToStart) {
return fmt.Errorf("not all containers have started: %d != %d", containersStarted, len(containerChanges.ContainersToStart))
}
return nil
return
}
// verifyNonRoot returns an error if the container or image will run as the root user.
@ -1961,7 +2000,10 @@ func getUidFromUser(id string) string {
return id
}
func (dm *DockerManager) doBackOff(pod *api.Pod, container *api.Container, podStatus *kubecontainer.PodStatus, backOff *util.Backoff) bool {
// If all instances of a container are garbage collected, doBackOff will also return false, which means the container may be restarted before the
// backoff deadline. However, because that won't cause error and the chance is really slim, we can just ignore it for now.
// If a container is still in backoff, the function will return a brief backoff error and a detailed error message.
func (dm *DockerManager) doBackOff(pod *api.Pod, container *api.Container, podStatus *kubecontainer.PodStatus, backOff *util.Backoff) (bool, error, string) {
containerStatus := podStatus.FindContainerStatusByName(container.Name)
if containerStatus != nil && containerStatus.State == kubecontainer.ContainerStateExited && !containerStatus.FinishedAt.IsZero() {
ts := containerStatus.FinishedAt
@ -1979,13 +2021,13 @@ func (dm *DockerManager) doBackOff(pod *api.Pod, container *api.Container, podSt
err := fmt.Errorf("Back-off %s restarting failed container=%s pod=%s", backOff.Get(stableName), container.Name, format.Pod(pod))
dm.updateReasonCache(pod, container, kubecontainer.ErrCrashLoopBackOff.Error(), err)
glog.Infof("%s", err.Error())
return true
return true, kubecontainer.ErrCrashLoopBackOff, err.Error()
}
backOff.Next(stableName, ts)
}
dm.clearReasonCache(pod, container)
return false
return false, nil, ""
}
// getPidMode returns the pid mode to use on the docker container based on pod.Spec.HostPID.