mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-30 15:05:27 +00:00
Merge pull request #5401 from yujuhong/syncstatic
Sync static pods from Kubelet to the API server
This commit is contained in:
commit
6b9b2a5ef1
@ -19,6 +19,7 @@ limitations under the License.
|
||||
package main
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"fmt"
|
||||
"io/ioutil"
|
||||
"net"
|
||||
@ -34,13 +35,14 @@ import (
|
||||
|
||||
kubeletapp "github.com/GoogleCloudPlatform/kubernetes/cmd/kubelet/app"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||
apierrors "github.com/GoogleCloudPlatform/kubernetes/pkg/api/errors"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/testapi"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
nodeControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/cloudprovider/controller"
|
||||
replicationControllerPkg "github.com/GoogleCloudPlatform/kubernetes/pkg/controller"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/cadvisor"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/volume/empty_dir"
|
||||
@ -118,7 +120,7 @@ func (h *delegateHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
|
||||
w.WriteHeader(http.StatusNotFound)
|
||||
}
|
||||
|
||||
func startComponents(manifestURL string) (apiServerURL string) {
|
||||
func startComponents(manifestURL string) (string, string) {
|
||||
// Setup
|
||||
servers := []string{}
|
||||
glog.Infof("Creating etcd client pointing to %v", servers)
|
||||
@ -215,21 +217,24 @@ func startComponents(manifestURL string) (apiServerURL string) {
|
||||
cadvisorInterface := new(cadvisor.Fake)
|
||||
|
||||
// Kubelet (localhost)
|
||||
testRootDir := makeTempDirOrDie("kubelet_integ_1.")
|
||||
testRootDir := makeTempDirOrDie("kubelet_integ_1.", "")
|
||||
configFilePath := makeTempDirOrDie("config", testRootDir)
|
||||
glog.Infof("Using %s as root dir for kubelet #1", testRootDir)
|
||||
kubeletapp.SimpleRunKubelet(cl, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface)
|
||||
kubeletapp.SimpleRunKubelet(cl, &fakeDocker1, machineList[0], testRootDir, manifestURL, "127.0.0.1", 10250, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, configFilePath)
|
||||
// Kubelet (machine)
|
||||
// Create a second kubelet so that the guestbook example's two redis slaves both
|
||||
// have a place they can schedule.
|
||||
testRootDir = makeTempDirOrDie("kubelet_integ_2.")
|
||||
testRootDir = makeTempDirOrDie("kubelet_integ_2.", "")
|
||||
glog.Infof("Using %s as root dir for kubelet #2", testRootDir)
|
||||
kubeletapp.SimpleRunKubelet(cl, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface)
|
||||
|
||||
return apiServer.URL
|
||||
kubeletapp.SimpleRunKubelet(cl, &fakeDocker2, machineList[1], testRootDir, "", "127.0.0.1", 10251, api.NamespaceDefault, empty_dir.ProbeVolumePlugins(), nil, cadvisorInterface, "")
|
||||
return apiServer.URL, configFilePath
|
||||
}
|
||||
|
||||
func makeTempDirOrDie(prefix string) string {
|
||||
tempDir, err := ioutil.TempDir("/tmp", prefix)
|
||||
func makeTempDirOrDie(prefix string, baseDir string) string {
|
||||
if baseDir == "" {
|
||||
baseDir = "/tmp"
|
||||
}
|
||||
tempDir, err := ioutil.TempDir(baseDir, prefix)
|
||||
if err != nil {
|
||||
glog.Fatalf("Can't make a temp rootdir: %v", err)
|
||||
}
|
||||
@ -278,6 +283,60 @@ func podExists(c *client.Client, podNamespace string, podID string) wait.Conditi
|
||||
}
|
||||
}
|
||||
|
||||
func podNotFound(c *client.Client, podNamespace string, podID string) wait.ConditionFunc {
|
||||
return func() (bool, error) {
|
||||
_, err := c.Pods(podNamespace).Get(podID)
|
||||
return apierrors.IsNotFound(err), nil
|
||||
}
|
||||
}
|
||||
|
||||
func podRunning(c *client.Client, podNamespace string, podID string) wait.ConditionFunc {
|
||||
return func() (bool, error) {
|
||||
pod, err := c.Pods(podNamespace).Get(podID)
|
||||
if err != nil {
|
||||
return false, err
|
||||
}
|
||||
if pod.Status.Phase != api.PodRunning {
|
||||
return false, errors.New(fmt.Sprintf("Pod status is %q", pod.Status.Phase))
|
||||
}
|
||||
return true, nil
|
||||
}
|
||||
}
|
||||
|
||||
func runStaticPodTest(c *client.Client, configFilePath string) {
|
||||
manifest := `version: v1beta2
|
||||
id: static-pod
|
||||
containers:
|
||||
- name: static-container
|
||||
image: kubernetes/pause`
|
||||
|
||||
manifestFile, err := ioutil.TempFile(configFilePath, "")
|
||||
defer os.Remove(manifestFile.Name())
|
||||
ioutil.WriteFile(manifestFile.Name(), []byte(manifest), 0600)
|
||||
|
||||
// Wait for the mirror pod to be created.
|
||||
hostname, _ := os.Hostname()
|
||||
podName := fmt.Sprintf("static-pod-%s", hostname)
|
||||
namespace := kubelet.NamespaceDefault
|
||||
if err := wait.Poll(time.Second, time.Second*30,
|
||||
podRunning(c, namespace, podName)); err != nil {
|
||||
glog.Fatalf("FAILED: mirror pod has not been created or is not running: %v", err)
|
||||
}
|
||||
// Delete the mirror pod, and wait for it to be recreated.
|
||||
c.Pods(namespace).Delete(podName)
|
||||
if err = wait.Poll(time.Second, time.Second*30,
|
||||
podRunning(c, namespace, podName)); err != nil {
|
||||
glog.Fatalf("FAILED: mirror pod has not been re-created or is not running: %v", err)
|
||||
}
|
||||
// Remove the manifest file, and wait for the mirror pod to be deleted.
|
||||
os.Remove(manifestFile.Name())
|
||||
if err = wait.Poll(time.Second, time.Second*30,
|
||||
podNotFound(c, namespace, podName)); err != nil {
|
||||
glog.Fatalf("FAILED: mirror pod has not been deleted: %v", err)
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
func runReplicationControllerTest(c *client.Client) {
|
||||
data, err := ioutil.ReadFile("cmd/integration/controller.json")
|
||||
if err != nil {
|
||||
@ -447,7 +506,7 @@ func runAtomicPutTest(c *client.Client) {
|
||||
glog.Infof("Posting update (%s, %s)", l, v)
|
||||
err = c.Put().Resource("services").Name(svc.Name).Body(&tmpSvc).Do().Error()
|
||||
if err != nil {
|
||||
if errors.IsConflict(err) {
|
||||
if apierrors.IsConflict(err) {
|
||||
glog.Infof("Conflict: (%s, %s)", l, v)
|
||||
// This is what we expect.
|
||||
continue
|
||||
@ -733,7 +792,7 @@ func main() {
|
||||
|
||||
manifestURL := ServeCachedManifestFile()
|
||||
|
||||
apiServerURL := startComponents(manifestURL)
|
||||
apiServerURL, configFilePath := startComponents(manifestURL)
|
||||
|
||||
// Ok. we're good to go.
|
||||
glog.Infof("API Server started on %s", apiServerURL)
|
||||
@ -754,6 +813,9 @@ func main() {
|
||||
runSelfLinkTestOnNamespace(c, "")
|
||||
runSelfLinkTestOnNamespace(c, "other")
|
||||
},
|
||||
func(c *client.Client) {
|
||||
runStaticPodTest(c, configFilePath)
|
||||
},
|
||||
}
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(testFuncs))
|
||||
@ -782,11 +844,15 @@ func main() {
|
||||
createdConts.Insert(p[:n-8])
|
||||
}
|
||||
}
|
||||
// We expect 9: 2 infra containers + 2 containers from the replication controller +
|
||||
// 1 infra container + 2 containers from the URL +
|
||||
// 1 infra container + 1 container from the service test.
|
||||
if len(createdConts) != 9 {
|
||||
glog.Fatalf("Expected 9 containers; got %v\n\nlist of created containers:\n\n%#v\n\nDocker 1 Created:\n\n%#v\n\nDocker 2 Created:\n\n%#v\n\n", len(createdConts), createdConts.List(), fakeDocker1.Created, fakeDocker2.Created)
|
||||
// We expect 9: 2 pod infra containers + 2 pods from the replication controller +
|
||||
// 1 pod infra container + 2 pods from the URL +
|
||||
// 1 pod infra container + 1 pod from the service test.
|
||||
// In addition, runStaticPodTest creates 1 pod infra containers +
|
||||
// 1 pod container from the mainfest file
|
||||
// The total number of container created is 11
|
||||
|
||||
if len(createdConts) != 11 {
|
||||
glog.Fatalf("Expected 11 containers; got %v\n\nlist of created containers:\n\n%#v\n\nDocker 1 Created:\n\n%#v\n\nDocker 2 Created:\n\n%#v\n\n", len(createdConts), createdConts.List(), fakeDocker1.Created, fakeDocker2.Created)
|
||||
}
|
||||
glog.Infof("OK - found created containers: %#v", createdConts.List())
|
||||
}
|
||||
|
@ -244,7 +244,8 @@ func SimpleRunKubelet(client *client.Client,
|
||||
masterServiceNamespace string,
|
||||
volumePlugins []volume.Plugin,
|
||||
tlsOptions *kubelet.TLSOptions,
|
||||
cadvisorInterface cadvisor.Interface) {
|
||||
cadvisorInterface cadvisor.Interface,
|
||||
configFilePath string) {
|
||||
kcfg := KubeletConfig{
|
||||
KubeClient: client,
|
||||
DockerClient: dockerClient,
|
||||
@ -264,6 +265,7 @@ func SimpleRunKubelet(client *client.Client,
|
||||
VolumePlugins: volumePlugins,
|
||||
TLSOptions: tlsOptions,
|
||||
CadvisorInterface: cadvisorInterface,
|
||||
ConfigFile: configFilePath,
|
||||
}
|
||||
RunKubelet(&kcfg)
|
||||
}
|
||||
|
@ -151,7 +151,7 @@ func startComponents(etcdClient tools.EtcdClient, cl *client.Client, addr net.IP
|
||||
if err != nil {
|
||||
glog.Fatalf("Failed to create cAdvisor: %v", err)
|
||||
}
|
||||
kubeletapp.SimpleRunKubelet(cl, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins(), nil, cadvisorInterface)
|
||||
kubeletapp.SimpleRunKubelet(cl, dockerClient, machineList[0], "/tmp/kubernetes", "", "127.0.0.1", 10250, *masterServiceNamespace, kubeletapp.ProbeVolumePlugins(), nil, cadvisorInterface, "")
|
||||
}
|
||||
|
||||
func newApiClient(addr net.IP, port int) *client.Client {
|
||||
|
@ -206,7 +206,7 @@ func extractFromFile(filename string) (api.Pod, error) {
|
||||
if glog.V(4) {
|
||||
glog.Infof("Got pod from file %q: %#v", filename, pod)
|
||||
} else {
|
||||
glog.V(1).Infof("Got pod from file %q: %s.%s (%s)", filename, pod.Namespace, pod.Name, pod.UID)
|
||||
glog.V(5).Infof("Got pod from file %q: %s.%s (%s)", filename, pod.Namespace, pod.Name, pod.UID)
|
||||
}
|
||||
return pod, nil
|
||||
}
|
||||
|
@ -78,19 +78,23 @@ const (
|
||||
)
|
||||
|
||||
var (
|
||||
// ErrNoKubeletContainers returned when there are not containers managed by the kubelet (ie: either no containers on the node, or none that the kubelet cares about).
|
||||
// ErrNoKubeletContainers returned when there are not containers managed by
|
||||
// the kubelet (ie: either no containers on the node, or none that the kubelet cares about).
|
||||
ErrNoKubeletContainers = errors.New("no containers managed by kubelet")
|
||||
|
||||
// ErrContainerNotFound returned when a container in the given pod with the given container name was not found, amongst those managed by the kubelet.
|
||||
// ErrContainerNotFound returned when a container in the given pod with the
|
||||
// given container name was not found, amongst those managed by the kubelet.
|
||||
ErrContainerNotFound = errors.New("no matching container")
|
||||
)
|
||||
|
||||
// SyncHandler is an interface implemented by Kubelet, for testability
|
||||
type SyncHandler interface {
|
||||
|
||||
// Syncs current state to match the specified pods. SyncPodType specified what
|
||||
// type of sync is occuring per pod. StartTime specifies the time at which
|
||||
// syncing began (for use in monitoring).
|
||||
SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, startTime time.Time) error
|
||||
SyncPods(pods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods util.StringSet,
|
||||
startTime time.Time) error
|
||||
}
|
||||
|
||||
type SourcesReadyFn func() bool
|
||||
@ -206,6 +210,8 @@ func NewMainKubelet(
|
||||
|
||||
klet.podStatuses = make(map[string]api.PodStatus)
|
||||
|
||||
klet.mirrorManager = newBasicMirrorManager(klet.kubeClient)
|
||||
|
||||
return klet, nil
|
||||
}
|
||||
|
||||
@ -235,6 +241,11 @@ type Kubelet struct {
|
||||
// pods are immutable
|
||||
podLock sync.RWMutex
|
||||
pods []api.Pod
|
||||
// Record the set of mirror pods (see mirror_manager.go for more details);
|
||||
// similar to pods, this is not immutable and is protected by the same podLock.
|
||||
// Note that Kubelet.pods do not contain mirror pods as they are filtered
|
||||
// out beforehand.
|
||||
mirrorPods util.StringSet
|
||||
|
||||
// Needed to report events for containers belonging to deleted/modified pods.
|
||||
// Tracks references for reporting events
|
||||
@ -288,6 +299,9 @@ type Kubelet struct {
|
||||
// A pod status cache currently used to store rejected pods and their statuses.
|
||||
podStatusesLock sync.RWMutex
|
||||
podStatuses map[string]api.PodStatus
|
||||
|
||||
// A mirror pod manager which provides helper functions.
|
||||
mirrorManager mirrorManager
|
||||
}
|
||||
|
||||
// getRootDir returns the full path to the directory under which kubelet can
|
||||
@ -1240,7 +1254,7 @@ type podContainerChangesSpec struct {
|
||||
containersToKeep map[dockertools.DockerID]int
|
||||
}
|
||||
|
||||
func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, containersInPod dockertools.DockerContainers) (podContainerChangesSpec, error) {
|
||||
func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, hasMirrorPod bool, containersInPod dockertools.DockerContainers) (podContainerChangesSpec, error) {
|
||||
podFullName := GetPodFullName(pod)
|
||||
uid := pod.UID
|
||||
glog.V(4).Infof("Syncing Pod %+v, podFullName: %q, uid: %q", pod, podFullName, uid)
|
||||
@ -1343,10 +1357,10 @@ func (kl *Kubelet) computePodContainerChanges(pod *api.Pod, containersInPod dock
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (kl *Kubelet) syncPod(pod *api.Pod, containersInPod dockertools.DockerContainers) error {
|
||||
func (kl *Kubelet) syncPod(pod *api.Pod, hasMirrorPod bool, containersInPod dockertools.DockerContainers) error {
|
||||
podFullName := GetPodFullName(pod)
|
||||
uid := pod.UID
|
||||
containerChanges, err := kl.computePodContainerChanges(pod, containersInPod)
|
||||
containerChanges, err := kl.computePodContainerChanges(pod, hasMirrorPod, containersInPod)
|
||||
glog.V(3).Infof("Got container changes for pod %q: %+v", podFullName, containerChanges)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -1416,6 +1430,13 @@ func (kl *Kubelet) syncPod(pod *api.Pod, containersInPod dockertools.DockerConta
|
||||
kl.pullImageAndRunContainer(pod, &pod.Spec.Containers[container], &podVolumes, podInfraContainerID)
|
||||
}
|
||||
|
||||
if !hasMirrorPod && isStaticPod(pod) {
|
||||
glog.V(4).Infof("Creating a mirror pod %q", podFullName)
|
||||
if err := kl.mirrorManager.CreateMirrorPod(*pod, kl.hostname); err != nil {
|
||||
glog.Errorf("Failed creating a mirror pod %q: %#v", podFullName, err)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -1496,7 +1517,7 @@ func (kl *Kubelet) cleanupOrphanedVolumes(pods []api.Pod, running []*docker.Cont
|
||||
}
|
||||
|
||||
// SyncPods synchronizes the configured list of pods (desired state) with the host current state.
|
||||
func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, start time.Time) error {
|
||||
func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metrics.SyncPodType, mirrorPods util.StringSet, start time.Time) error {
|
||||
defer func() {
|
||||
metrics.SyncPodsLatency.Observe(metrics.SinceInMicroseconds(start))
|
||||
}()
|
||||
@ -1543,7 +1564,7 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
|
||||
}
|
||||
|
||||
// Run the sync in an async manifest worker.
|
||||
kl.podWorkers.UpdatePod(pod, func() {
|
||||
kl.podWorkers.UpdatePod(pod, kl.mirrorPods.Has(podFullName), func() {
|
||||
metrics.SyncPodLatency.WithLabelValues(podSyncTypes[pod.UID].String()).Observe(metrics.SinceInMicroseconds(start))
|
||||
})
|
||||
|
||||
@ -1604,6 +1625,9 @@ func (kl *Kubelet) SyncPods(allPods []api.Pod, podSyncTypes map[types.UID]metric
|
||||
return err
|
||||
}
|
||||
|
||||
// Remove any orphaned mirror pods.
|
||||
deleteOrphanedMirrorPods(pods, mirrorPods, kl.mirrorManager)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@ -1704,18 +1728,18 @@ func (kl *Kubelet) syncLoop(updates <-chan PodUpdate, handler SyncHandler) {
|
||||
}
|
||||
}
|
||||
|
||||
pods, err := kl.GetPods()
|
||||
pods, mirrorPods, err := kl.GetPods()
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to get bound pods.")
|
||||
return
|
||||
}
|
||||
if err := handler.SyncPods(pods, podSyncTypes, start); err != nil {
|
||||
if err := handler.SyncPods(pods, podSyncTypes, mirrorPods, start); err != nil {
|
||||
glog.Errorf("Couldn't sync containers: %v", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Updated the Kubelet's internal pods with those provided by the update.
|
||||
// Update the Kubelet's internal pods with those provided by the update.
|
||||
// Records new and updated pods in newPods and updatedPods.
|
||||
func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.SyncPodType) {
|
||||
kl.podLock.Lock()
|
||||
@ -1723,6 +1747,7 @@ func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.Sy
|
||||
switch u.Op {
|
||||
case SET:
|
||||
glog.V(3).Infof("SET: Containers changed")
|
||||
newPods, newMirrorPods := filterAndCategorizePods(u.Pods)
|
||||
|
||||
// Store the new pods. Don't worry about filtering host ports since those
|
||||
// pods will never be looked up.
|
||||
@ -1730,13 +1755,15 @@ func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.Sy
|
||||
for i := range kl.pods {
|
||||
existingPods[kl.pods[i].UID] = struct{}{}
|
||||
}
|
||||
for i := range u.Pods {
|
||||
if _, ok := existingPods[u.Pods[i].UID]; !ok {
|
||||
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodCreate
|
||||
for _, pod := range newPods {
|
||||
if _, ok := existingPods[pod.UID]; !ok {
|
||||
podSyncTypes[pod.UID] = metrics.SyncPodCreate
|
||||
}
|
||||
}
|
||||
// Actually update the pods.
|
||||
kl.pods = newPods
|
||||
kl.mirrorPods = newMirrorPods
|
||||
|
||||
kl.pods = u.Pods
|
||||
kl.handleHostPortConflicts(kl.pods)
|
||||
case UPDATE:
|
||||
glog.V(3).Infof("Update: Containers changed")
|
||||
@ -1746,7 +1773,8 @@ func (kl *Kubelet) updatePods(u PodUpdate, podSyncTypes map[types.UID]metrics.Sy
|
||||
for i := range u.Pods {
|
||||
podSyncTypes[u.Pods[i].UID] = metrics.SyncPodUpdate
|
||||
}
|
||||
kl.pods = updatePods(u.Pods, kl.pods)
|
||||
allPods := updatePods(u.Pods, kl.pods)
|
||||
kl.pods, kl.mirrorPods = filterAndCategorizePods(allPods)
|
||||
kl.handleHostPortConflicts(kl.pods)
|
||||
default:
|
||||
panic("syncLoop does not support incremental changes")
|
||||
@ -1818,11 +1846,12 @@ func (kl *Kubelet) GetHostname() string {
|
||||
return kl.hostname
|
||||
}
|
||||
|
||||
// GetPods returns all pods bound to the kubelet and their spec.
|
||||
func (kl *Kubelet) GetPods() ([]api.Pod, error) {
|
||||
// GetPods returns all pods bound to the kubelet and their spec, and the mirror
|
||||
// pod map.
|
||||
func (kl *Kubelet) GetPods() ([]api.Pod, util.StringSet, error) {
|
||||
kl.podLock.RLock()
|
||||
defer kl.podLock.RUnlock()
|
||||
return append([]api.Pod{}, kl.pods...), nil
|
||||
return append([]api.Pod{}, kl.pods...), kl.mirrorPods, nil
|
||||
}
|
||||
|
||||
// GetPodByName provides the first pod that matches namespace and name, as well as whether the node was found.
|
||||
|
@ -53,11 +53,12 @@ func init() {
|
||||
}
|
||||
|
||||
type TestKubelet struct {
|
||||
kubelet *Kubelet
|
||||
fakeDocker *dockertools.FakeDockerClient
|
||||
fakeCadvisor *cadvisor.Mock
|
||||
fakeKubeClient *client.Fake
|
||||
waitGroup *sync.WaitGroup
|
||||
kubelet *Kubelet
|
||||
fakeDocker *dockertools.FakeDockerClient
|
||||
fakeCadvisor *cadvisor.Mock
|
||||
fakeKubeClient *client.Fake
|
||||
waitGroup *sync.WaitGroup
|
||||
fakeMirrorManager *fakeMirrorManager
|
||||
}
|
||||
|
||||
func newTestKubelet(t *testing.T) *TestKubelet {
|
||||
@ -83,8 +84,8 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
||||
waitGroup := new(sync.WaitGroup)
|
||||
kubelet.podWorkers = newPodWorkers(
|
||||
fakeDockerCache,
|
||||
func(pod *api.Pod, containers dockertools.DockerContainers) error {
|
||||
err := kubelet.syncPod(pod, containers)
|
||||
func(pod *api.Pod, hasMirrorPod bool, containers dockertools.DockerContainers) error {
|
||||
err := kubelet.syncPod(pod, hasMirrorPod, containers)
|
||||
waitGroup.Done()
|
||||
return err
|
||||
},
|
||||
@ -100,8 +101,9 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
||||
}
|
||||
mockCadvisor := &cadvisor.Mock{}
|
||||
kubelet.cadvisor = mockCadvisor
|
||||
|
||||
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup}
|
||||
mirrorManager := newFakeMirrorMananger()
|
||||
kubelet.mirrorManager = mirrorManager
|
||||
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, mirrorManager}
|
||||
}
|
||||
|
||||
func verifyCalls(t *testing.T, fakeDocker *dockertools.FakeDockerClient, calls []string) {
|
||||
@ -442,7 +444,7 @@ func TestSyncPodsDoesNothing(t *testing.T) {
|
||||
},
|
||||
}
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
|
||||
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -475,7 +477,7 @@ func TestSyncPodsWithTerminationLog(t *testing.T) {
|
||||
},
|
||||
}
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
|
||||
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -524,7 +526,7 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) {
|
||||
},
|
||||
}
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
|
||||
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -577,7 +579,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
|
||||
},
|
||||
}
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
|
||||
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -627,7 +629,7 @@ func TestSyncPodsWithPodInfraCreatesContainer(t *testing.T) {
|
||||
},
|
||||
}
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
|
||||
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -684,7 +686,7 @@ func TestSyncPodsWithPodInfraCreatesContainerCallsHandler(t *testing.T) {
|
||||
},
|
||||
}
|
||||
waitGroup.Add(1)
|
||||
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
|
||||
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -753,7 +755,7 @@ func TestSyncPodsDeletesWithNoPodInfraContainer(t *testing.T) {
|
||||
},
|
||||
}
|
||||
waitGroup.Add(2)
|
||||
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, time.Now())
|
||||
err := kubelet.SyncPods(kubelet.pods, emptyPodUIDs, util.NewStringSet(), time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -793,7 +795,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
|
||||
ID: "9876",
|
||||
},
|
||||
}
|
||||
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, time.Now()); err != nil {
|
||||
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now()); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
// Validate nothing happened.
|
||||
@ -801,7 +803,7 @@ func TestSyncPodsDeletesWhenSourcesAreReady(t *testing.T) {
|
||||
fakeDocker.ClearCalls()
|
||||
|
||||
ready = true
|
||||
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, time.Now()); err != nil {
|
||||
if err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now()); err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
verifyCalls(t, fakeDocker, []string{"list", "stop", "stop", "inspect_container", "inspect_container"})
|
||||
@ -839,7 +841,7 @@ func TestSyncPodsDeletes(t *testing.T) {
|
||||
ID: "4567",
|
||||
},
|
||||
}
|
||||
err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, time.Now())
|
||||
err := kubelet.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -893,7 +895,7 @@ func TestSyncPodDeletesDuplicate(t *testing.T) {
|
||||
},
|
||||
}
|
||||
kubelet.pods = append(kubelet.pods, bound)
|
||||
err := kubelet.syncPod(&bound, dockerContainers)
|
||||
err := kubelet.syncPod(&bound, false, dockerContainers)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -934,7 +936,7 @@ func TestSyncPodBadHash(t *testing.T) {
|
||||
},
|
||||
}
|
||||
kubelet.pods = append(kubelet.pods, bound)
|
||||
err := kubelet.syncPod(&bound, dockerContainers)
|
||||
err := kubelet.syncPod(&bound, false, dockerContainers)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -988,7 +990,7 @@ func TestSyncPodUnhealthy(t *testing.T) {
|
||||
},
|
||||
}
|
||||
kubelet.pods = append(kubelet.pods, bound)
|
||||
err := kubelet.syncPod(&bound, dockerContainers)
|
||||
err := kubelet.syncPod(&bound, false, dockerContainers)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -1678,7 +1680,7 @@ func TestSyncPodEventHandlerFails(t *testing.T) {
|
||||
},
|
||||
}
|
||||
kubelet.pods = append(kubelet.pods, bound)
|
||||
err := kubelet.syncPod(&bound, dockerContainers)
|
||||
err := kubelet.syncPod(&bound, false, dockerContainers)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -2075,7 +2077,7 @@ func TestSyncPodsWithPullPolicy(t *testing.T) {
|
||||
},
|
||||
},
|
||||
},
|
||||
}, emptyPodUIDs, time.Now())
|
||||
}, emptyPodUIDs, util.NewStringSet(), time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
@ -3176,7 +3178,7 @@ func TestPurgingObsoleteStatusMapEntries(t *testing.T) {
|
||||
t.Fatalf("expected length of status map to be 1. Got map %#v.", kl.podStatuses)
|
||||
}
|
||||
// Sync with empty pods so that the entry in status map will be removed.
|
||||
kl.SyncPods([]api.Pod{}, emptyPodUIDs, time.Now())
|
||||
kl.SyncPods([]api.Pod{}, emptyPodUIDs, util.NewStringSet(), time.Now())
|
||||
if len(kl.podStatuses) != 0 {
|
||||
t.Fatalf("expected length of status map to be 0. Got map %#v.", kl.podStatuses)
|
||||
}
|
||||
@ -3388,3 +3390,57 @@ func TestUpdateNodeStatusError(t *testing.T) {
|
||||
t.Errorf("unexpected actions: %v", kubeClient.Actions)
|
||||
}
|
||||
}
|
||||
|
||||
func TestCreateMirrorPod(t *testing.T) {
|
||||
testKubelet := newTestKubelet(t)
|
||||
kl := testKubelet.kubelet
|
||||
manager := testKubelet.fakeMirrorManager
|
||||
pod := api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
UID: "12345678",
|
||||
Name: "bar",
|
||||
Namespace: "foo",
|
||||
Annotations: map[string]string{
|
||||
ConfigSourceAnnotationKey: "file",
|
||||
},
|
||||
},
|
||||
}
|
||||
kl.pods = append(kl.pods, pod)
|
||||
hasMirrorPod := false
|
||||
err := kl.syncPod(&pod, hasMirrorPod, dockertools.DockerContainers{})
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
podFullName := GetPodFullName(&pod)
|
||||
if !manager.HasPod(podFullName) {
|
||||
t.Errorf("expected mirror pod %q to be created", podFullName)
|
||||
}
|
||||
if manager.NumOfPods() != 1 || !manager.HasPod(podFullName) {
|
||||
t.Errorf("expected one mirror pod %q, got %v", podFullName, manager.GetPods())
|
||||
}
|
||||
}
|
||||
|
||||
func TestDeleteOrphanedMirrorPods(t *testing.T) {
|
||||
testKubelet := newTestKubelet(t)
|
||||
kl := testKubelet.kubelet
|
||||
manager := testKubelet.fakeMirrorManager
|
||||
orphanedPodNames := []string{"pod1_ns", "pod2_ns"}
|
||||
mirrorPods := util.NewStringSet()
|
||||
for _, name := range orphanedPodNames {
|
||||
mirrorPods.Insert(name)
|
||||
}
|
||||
// Sync with an empty pod list to delete all mirror pods.
|
||||
err := kl.SyncPods([]api.Pod{}, emptyPodUIDs, mirrorPods, time.Now())
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error: %v", err)
|
||||
}
|
||||
if manager.NumOfPods() != 0 {
|
||||
t.Errorf("expected zero mirror pods, got %v", manager.GetPods())
|
||||
}
|
||||
for _, name := range orphanedPodNames {
|
||||
creates, deletes := manager.GetCounts(name)
|
||||
if creates != 0 || deletes != 1 {
|
||||
t.Errorf("expected 0 creation and one deletion of %q, got %d, %d", name, creates, deletes)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
138
pkg/kubelet/mirror_manager.go
Normal file
138
pkg/kubelet/mirror_manager.go
Normal file
@ -0,0 +1,138 @@
|
||||
/*
|
||||
Copyright 2015 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kubelet
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
// Kubelet discover pod updates from 3 sources: file, http, and apiserver.
|
||||
// Pods from non-apiserver sources are called static pods, and API server is
|
||||
// not aware of the existence of static pods. In order to monitor the status of
|
||||
// such pods, kubelet create a mirror pod for each static pod via the API
|
||||
// server.
|
||||
//
|
||||
// A mirror pod has the same pod full name (name and namespace) as its static
|
||||
// counterpart (albeit different metadata such as UID, etc). By leveraging the
|
||||
// fact that kubelet reports the pod status using the pod full name, the status
|
||||
// of the mirror pod always reflects the acutal status of the static pod.
|
||||
// When a static pod gets deleted, the associated orphaned mirror pods will
|
||||
// also be removed.
|
||||
//
|
||||
// This file includes functions to manage the mirror pods.
|
||||
|
||||
type mirrorManager interface {
|
||||
CreateMirrorPod(api.Pod, string) error
|
||||
DeleteMirrorPod(string) error
|
||||
}
|
||||
|
||||
type basicMirrorManager struct {
|
||||
// mirror pods are stored in the kubelet directly because they need to be
|
||||
// in sync with the internal pods.
|
||||
apiserverClient client.Interface
|
||||
}
|
||||
|
||||
func newBasicMirrorManager(apiserverClient client.Interface) *basicMirrorManager {
|
||||
return &basicMirrorManager{apiserverClient: apiserverClient}
|
||||
}
|
||||
|
||||
// Creates a mirror pod.
|
||||
func (self *basicMirrorManager) CreateMirrorPod(pod api.Pod, hostname string) error {
|
||||
if self.apiserverClient == nil {
|
||||
return nil
|
||||
}
|
||||
// Indicate that the pod should be scheduled to the current node.
|
||||
pod.Spec.Host = hostname
|
||||
pod.Annotations[ConfigMirrorAnnotationKey] = MirrorType
|
||||
|
||||
_, err := self.apiserverClient.Pods(NamespaceDefault).Create(&pod)
|
||||
return err
|
||||
}
|
||||
|
||||
// Deletes a mirror pod.
|
||||
func (self *basicMirrorManager) DeleteMirrorPod(podFullName string) error {
|
||||
if self.apiserverClient == nil {
|
||||
return nil
|
||||
}
|
||||
name, namespace, err := ParsePodFullName(podFullName)
|
||||
if err != nil {
|
||||
glog.Errorf("Failed to parse a pod full name %q", podFullName)
|
||||
return err
|
||||
}
|
||||
glog.V(4).Infof("Deleting a mirror pod %q", podFullName)
|
||||
if err := self.apiserverClient.Pods(namespace).Delete(name); err != nil {
|
||||
glog.Errorf("Failed deleting a mirror pod %q: %v", podFullName, err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Delete all orphaned mirror pods.
|
||||
func deleteOrphanedMirrorPods(pods []api.Pod, mirrorPods util.StringSet, manager mirrorManager) {
|
||||
existingPods := util.NewStringSet()
|
||||
for _, pod := range pods {
|
||||
existingPods.Insert(GetPodFullName(&pod))
|
||||
}
|
||||
for podFullName := range mirrorPods {
|
||||
if !existingPods.Has(podFullName) {
|
||||
manager.DeleteMirrorPod(podFullName)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Helper functions.
|
||||
func getPodSource(pod *api.Pod) (string, error) {
|
||||
if pod.Annotations != nil {
|
||||
if source, ok := pod.Annotations[ConfigSourceAnnotationKey]; ok {
|
||||
return source, nil
|
||||
}
|
||||
}
|
||||
return "", fmt.Errorf("cannot get source of pod %q", pod.UID)
|
||||
}
|
||||
|
||||
func isStaticPod(pod *api.Pod) bool {
|
||||
source, err := getPodSource(pod)
|
||||
return err == nil && source != ApiserverSource
|
||||
}
|
||||
|
||||
func isMirrorPod(pod *api.Pod) bool {
|
||||
if value, ok := pod.Annotations[ConfigMirrorAnnotationKey]; !ok {
|
||||
return false
|
||||
} else {
|
||||
return value == MirrorType
|
||||
}
|
||||
}
|
||||
|
||||
// This function separate the mirror pods from regular pods to
|
||||
// facilitate pods syncing and mirror pod creation/deletion.
|
||||
func filterAndCategorizePods(pods []api.Pod) ([]api.Pod, util.StringSet) {
|
||||
filteredPods := []api.Pod{}
|
||||
mirrorPods := util.NewStringSet()
|
||||
for _, pod := range pods {
|
||||
name := GetPodFullName(&pod)
|
||||
if isMirrorPod(&pod) {
|
||||
mirrorPods.Insert(name)
|
||||
} else {
|
||||
filteredPods = append(filteredPods, pod)
|
||||
}
|
||||
}
|
||||
return filteredPods, mirrorPods
|
||||
}
|
158
pkg/kubelet/mirror_manager_test.go
Normal file
158
pkg/kubelet/mirror_manager_test.go
Normal file
@ -0,0 +1,158 @@
|
||||
/*
|
||||
Copyright 2015 Google Inc. All rights reserved.
|
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License");
|
||||
you may not use this file except in compliance with the License.
|
||||
You may obtain a copy of the License at
|
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0
|
||||
|
||||
Unless required by applicable law or agreed to in writing, software
|
||||
distributed under the License is distributed on an "AS IS" BASIS,
|
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
See the License for the specific language governing permissions and
|
||||
limitations under the License.
|
||||
*/
|
||||
|
||||
package kubelet
|
||||
|
||||
import (
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
)
|
||||
|
||||
type fakeMirrorManager struct {
|
||||
mirrorPodLock sync.RWMutex
|
||||
// Note that a real mirror manager does not store the mirror pods in
|
||||
// itself. This fake manager does this to track calls.
|
||||
mirrorPods util.StringSet
|
||||
createCounts map[string]int
|
||||
deleteCounts map[string]int
|
||||
}
|
||||
|
||||
func (self *fakeMirrorManager) CreateMirrorPod(pod api.Pod, _ string) error {
|
||||
self.mirrorPodLock.Lock()
|
||||
defer self.mirrorPodLock.Unlock()
|
||||
podFullName := GetPodFullName(&pod)
|
||||
self.mirrorPods.Insert(podFullName)
|
||||
self.createCounts[podFullName]++
|
||||
return nil
|
||||
}
|
||||
|
||||
func (self *fakeMirrorManager) DeleteMirrorPod(podFullName string) error {
|
||||
self.mirrorPodLock.Lock()
|
||||
defer self.mirrorPodLock.Unlock()
|
||||
self.mirrorPods.Delete(podFullName)
|
||||
self.deleteCounts[podFullName]++
|
||||
return nil
|
||||
}
|
||||
|
||||
func newFakeMirrorMananger() *fakeMirrorManager {
|
||||
m := fakeMirrorManager{}
|
||||
m.mirrorPods = util.NewStringSet()
|
||||
m.createCounts = make(map[string]int)
|
||||
m.deleteCounts = make(map[string]int)
|
||||
return &m
|
||||
}
|
||||
|
||||
func (self *fakeMirrorManager) HasPod(podFullName string) bool {
|
||||
self.mirrorPodLock.RLock()
|
||||
defer self.mirrorPodLock.RUnlock()
|
||||
return self.mirrorPods.Has(podFullName)
|
||||
}
|
||||
|
||||
func (self *fakeMirrorManager) NumOfPods() int {
|
||||
self.mirrorPodLock.RLock()
|
||||
defer self.mirrorPodLock.RUnlock()
|
||||
return self.mirrorPods.Len()
|
||||
}
|
||||
|
||||
func (self *fakeMirrorManager) GetPods() []string {
|
||||
self.mirrorPodLock.RLock()
|
||||
defer self.mirrorPodLock.RUnlock()
|
||||
return self.mirrorPods.List()
|
||||
}
|
||||
|
||||
func (self *fakeMirrorManager) GetCounts(podFullName string) (int, int) {
|
||||
self.mirrorPodLock.RLock()
|
||||
defer self.mirrorPodLock.RUnlock()
|
||||
return self.createCounts[podFullName], self.deleteCounts[podFullName]
|
||||
}
|
||||
|
||||
// Tests that mirror pods are filtered out properly from the pod update.
|
||||
func TestFilterOutMirrorPods(t *testing.T) {
|
||||
mirrorPod := api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
UID: "987654321",
|
||||
Name: "bar",
|
||||
Namespace: "default",
|
||||
Annotations: map[string]string{
|
||||
ConfigSourceAnnotationKey: "api",
|
||||
ConfigMirrorAnnotationKey: "mirror",
|
||||
},
|
||||
},
|
||||
}
|
||||
staticPod := api.Pod{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
UID: "123456789",
|
||||
Name: "bar",
|
||||
Namespace: "default",
|
||||
Annotations: map[string]string{ConfigSourceAnnotationKey: "file"},
|
||||
},
|
||||
}
|
||||
|
||||
expectedPods := []api.Pod{
|
||||
{
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
UID: "999999999",
|
||||
Name: "taco",
|
||||
Namespace: "default",
|
||||
Annotations: map[string]string{ConfigSourceAnnotationKey: "api"},
|
||||
},
|
||||
},
|
||||
staticPod,
|
||||
}
|
||||
updates := append(expectedPods, mirrorPod)
|
||||
actualPods, actualMirrorPods := filterAndCategorizePods(updates)
|
||||
if !reflect.DeepEqual(expectedPods, actualPods) {
|
||||
t.Errorf("expected %#v, got %#v", expectedPods, actualPods)
|
||||
}
|
||||
if !actualMirrorPods.Has(GetPodFullName(&mirrorPod)) {
|
||||
t.Errorf("mirror pod is not recorded")
|
||||
}
|
||||
}
|
||||
|
||||
func TestParsePodFullName(t *testing.T) {
|
||||
type nameTuple struct {
|
||||
Name string
|
||||
Namespace string
|
||||
}
|
||||
successfulCases := map[string]nameTuple{
|
||||
"bar_foo": {Name: "bar", Namespace: "foo"},
|
||||
"bar.org_foo.com": {Name: "bar.org", Namespace: "foo.com"},
|
||||
"bar-bar_foo": {Name: "bar-bar", Namespace: "foo"},
|
||||
}
|
||||
failedCases := []string{"barfoo", "bar_foo_foo", ""}
|
||||
|
||||
for podFullName, expected := range successfulCases {
|
||||
name, namespace, err := ParsePodFullName(podFullName)
|
||||
if err != nil {
|
||||
t.Errorf("unexpected error when parsing the full name: %v", err)
|
||||
continue
|
||||
}
|
||||
if name != expected.Name || namespace != expected.Namespace {
|
||||
t.Errorf("expected name %q, namespace %q; got name %q, namespace %q",
|
||||
expected.Name, expected.Namespace, name, namespace)
|
||||
}
|
||||
}
|
||||
for _, podFullName := range failedCases {
|
||||
_, _, err := ParsePodFullName(podFullName)
|
||||
if err == nil {
|
||||
t.Errorf("expected error when parsing the full name, got none")
|
||||
}
|
||||
}
|
||||
}
|
@ -28,7 +28,7 @@ import (
|
||||
"github.com/golang/glog"
|
||||
)
|
||||
|
||||
type syncPodFnType func(*api.Pod, dockertools.DockerContainers) error
|
||||
type syncPodFnType func(*api.Pod, bool, dockertools.DockerContainers) error
|
||||
|
||||
type podWorkers struct {
|
||||
// Protects podUpdates field.
|
||||
@ -60,11 +60,15 @@ type workUpdate struct {
|
||||
// The pod state to reflect.
|
||||
pod *api.Pod
|
||||
|
||||
// Whether there exists a mirror pod for pod.
|
||||
hasMirrorPod bool
|
||||
|
||||
// Function to call when the update is complete.
|
||||
updateCompleteFn func()
|
||||
}
|
||||
|
||||
func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFn syncPodFnType, recorder record.EventRecorder) *podWorkers {
|
||||
func newPodWorkers(dockerCache dockertools.DockerCache, syncPodFn syncPodFnType,
|
||||
recorder record.EventRecorder) *podWorkers {
|
||||
return &podWorkers{
|
||||
podUpdates: map[types.UID]chan workUpdate{},
|
||||
isWorking: map[types.UID]bool{},
|
||||
@ -92,7 +96,8 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
|
||||
return
|
||||
}
|
||||
|
||||
err = p.syncPodFn(newWork.pod, containers.FindContainersByPod(newWork.pod.UID, GetPodFullName(newWork.pod)))
|
||||
err = p.syncPodFn(newWork.pod, newWork.hasMirrorPod,
|
||||
containers.FindContainersByPod(newWork.pod.UID, GetPodFullName(newWork.pod)))
|
||||
if err != nil {
|
||||
glog.Errorf("Error syncing pod %s, skipping: %v", newWork.pod.UID, err)
|
||||
p.recorder.Eventf(newWork.pod, "failedSync", "Error syncing pod, skipping: %v", err)
|
||||
@ -106,7 +111,7 @@ func (p *podWorkers) managePodLoop(podUpdates <-chan workUpdate) {
|
||||
}
|
||||
|
||||
// Apply the new setting to the specified pod. updateComplete is called when the update is completed.
|
||||
func (p *podWorkers) UpdatePod(pod *api.Pod, updateComplete func()) {
|
||||
func (p *podWorkers) UpdatePod(pod *api.Pod, hasMirrorPod bool, updateComplete func()) {
|
||||
uid := pod.UID
|
||||
var podUpdates chan workUpdate
|
||||
var exists bool
|
||||
@ -129,11 +134,13 @@ func (p *podWorkers) UpdatePod(pod *api.Pod, updateComplete func()) {
|
||||
p.isWorking[pod.UID] = true
|
||||
podUpdates <- workUpdate{
|
||||
pod: pod,
|
||||
hasMirrorPod: hasMirrorPod,
|
||||
updateCompleteFn: updateComplete,
|
||||
}
|
||||
} else {
|
||||
p.lastUndeliveredWorkUpdate[pod.UID] = workUpdate{
|
||||
pod: pod,
|
||||
hasMirrorPod: hasMirrorPod,
|
||||
updateCompleteFn: updateComplete,
|
||||
}
|
||||
}
|
||||
|
@ -46,7 +46,7 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) {
|
||||
|
||||
podWorkers := newPodWorkers(
|
||||
fakeDockerCache,
|
||||
func(pod *api.Pod, containers dockertools.DockerContainers) error {
|
||||
func(pod *api.Pod, hasMirrorPod bool, containers dockertools.DockerContainers) error {
|
||||
func() {
|
||||
lock.Lock()
|
||||
defer lock.Unlock()
|
||||
@ -54,7 +54,8 @@ func createPodWorkers() (*podWorkers, map[types.UID][]string) {
|
||||
}()
|
||||
return nil
|
||||
},
|
||||
recorder)
|
||||
recorder,
|
||||
)
|
||||
return podWorkers, processed
|
||||
}
|
||||
|
||||
@ -82,7 +83,7 @@ func TestUpdatePod(t *testing.T) {
|
||||
numPods := 20
|
||||
for i := 0; i < numPods; i++ {
|
||||
for j := i; j < numPods; j++ {
|
||||
podWorkers.UpdatePod(newPod(string(j), string(i)), func() {})
|
||||
podWorkers.UpdatePod(newPod(string(j), string(i)), false, func() {})
|
||||
}
|
||||
}
|
||||
drainWorkers(podWorkers, numPods)
|
||||
@ -115,7 +116,7 @@ func TestForgetNonExistingPodWorkers(t *testing.T) {
|
||||
|
||||
numPods := 20
|
||||
for i := 0; i < numPods; i++ {
|
||||
podWorkers.UpdatePod(newPod(string(i), "name"), func() {})
|
||||
podWorkers.UpdatePod(newPod(string(i), "name"), false, func() {})
|
||||
}
|
||||
drainWorkers(podWorkers, numPods)
|
||||
|
||||
|
@ -104,7 +104,9 @@ func (kl *Kubelet) runPod(pod api.Pod, retryDelay time.Duration) error {
|
||||
return nil
|
||||
}
|
||||
glog.Infof("pod %q containers not running: syncing", pod.Name)
|
||||
if err = kl.syncPod(&pod, dockerContainers); err != nil {
|
||||
// We don't create mirror pods in this mode; pass a dummy boolean value
|
||||
// to sycnPod.
|
||||
if err = kl.syncPod(&pod, false, dockerContainers); err != nil {
|
||||
return fmt.Errorf("error syncing pod: %v", err)
|
||||
}
|
||||
if retry >= RunOnceMaxRetries {
|
||||
|
@ -38,6 +38,7 @@ import (
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/httplog"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream/spdy"
|
||||
"github.com/golang/glog"
|
||||
@ -84,7 +85,7 @@ type HostInterface interface {
|
||||
GetRootInfo(req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
|
||||
GetDockerVersion() ([]uint, error)
|
||||
GetMachineInfo() (*cadvisorApi.MachineInfo, error)
|
||||
GetPods() ([]api.Pod, error)
|
||||
GetPods() ([]api.Pod, util.StringSet, error)
|
||||
GetPodByName(namespace, name string) (*api.Pod, bool)
|
||||
GetPodStatus(name string, uid types.UID) (api.PodStatus, error)
|
||||
RunInContainer(name string, uid types.UID, container string, cmd []string) ([]byte, error)
|
||||
@ -258,9 +259,9 @@ func (s *Server) handleContainerLogs(w http.ResponseWriter, req *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
// handlePods returns a list of pod bounds to the Kubelet and their spec
|
||||
// handlePods returns a list of pod bound to the Kubelet and their spec
|
||||
func (s *Server) handlePods(w http.ResponseWriter, req *http.Request) {
|
||||
pods, err := s.host.GetPods()
|
||||
pods, _, err := s.host.GetPods()
|
||||
if err != nil {
|
||||
s.error(w, err)
|
||||
return
|
||||
|
@ -33,6 +33,7 @@ import (
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/types"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream"
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/util/httpstream/spdy"
|
||||
cadvisorApi "github.com/google/cadvisor/info/v1"
|
||||
@ -44,7 +45,7 @@ type fakeKubelet struct {
|
||||
containerInfoFunc func(podFullName string, uid types.UID, containerName string, req *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
|
||||
rootInfoFunc func(query *cadvisorApi.ContainerInfoRequest) (*cadvisorApi.ContainerInfo, error)
|
||||
machineInfoFunc func() (*cadvisorApi.MachineInfo, error)
|
||||
podsFunc func() ([]api.Pod, error)
|
||||
podsFunc func() ([]api.Pod, util.StringSet, error)
|
||||
logFunc func(w http.ResponseWriter, req *http.Request)
|
||||
runFunc func(podFullName string, uid types.UID, containerName string, cmd []string) ([]byte, error)
|
||||
dockerVersionFunc func() ([]uint, error)
|
||||
@ -79,7 +80,7 @@ func (fk *fakeKubelet) GetMachineInfo() (*cadvisorApi.MachineInfo, error) {
|
||||
return fk.machineInfoFunc()
|
||||
}
|
||||
|
||||
func (fk *fakeKubelet) GetPods() ([]api.Pod, error) {
|
||||
func (fk *fakeKubelet) GetPods() ([]api.Pod, util.StringSet, error) {
|
||||
return fk.podsFunc()
|
||||
}
|
||||
|
||||
|
@ -18,11 +18,13 @@ package kubelet
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||
)
|
||||
|
||||
const ConfigSourceAnnotationKey = "kubernetes.io/config.source"
|
||||
const ConfigMirrorAnnotationKey = "kubernetes.io/config.mirror"
|
||||
|
||||
// PodOperation defines what changes will be made on a pod configuration.
|
||||
type PodOperation int
|
||||
@ -49,6 +51,9 @@ const (
|
||||
// Updates from all sources
|
||||
AllSource = "*"
|
||||
|
||||
// Used for ConfigMirrorAnnotationKey.
|
||||
MirrorType = "mirror"
|
||||
|
||||
NamespaceDefault = api.NamespaceDefault
|
||||
)
|
||||
|
||||
@ -67,7 +72,7 @@ type PodUpdate struct {
|
||||
Source string
|
||||
}
|
||||
|
||||
// GetPodFullName returns a name that uniquely identifies a pod across all config sources.
|
||||
// GetPodFullName returns a name that uniquely identifies a pod.
|
||||
func GetPodFullName(pod *api.Pod) string {
|
||||
// Use underscore as the delimiter because it is not allowed in pod name
|
||||
// (DNS subdomain format), while allowed in the container name format.
|
||||
@ -78,3 +83,12 @@ func GetPodFullName(pod *api.Pod) string {
|
||||
func BuildPodFullName(name, namespace string) string {
|
||||
return name + "_" + namespace
|
||||
}
|
||||
|
||||
// Parse the pod full name.
|
||||
func ParsePodFullName(podFullName string) (string, string, error) {
|
||||
parts := strings.Split(podFullName, "_")
|
||||
if len(parts) != 2 {
|
||||
return "", "", fmt.Errorf("failed to parse the pod full name %q", podFullName)
|
||||
}
|
||||
return parts[0], parts[1], nil
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user