diff --git a/cmd/kubelet/kubelet.go b/cmd/kubelet/kubelet.go index d8a919be680..036fe3da176 100644 --- a/cmd/kubelet/kubelet.go +++ b/cmd/kubelet/kubelet.go @@ -48,21 +48,22 @@ import ( const defaultRootDir = "/var/lib/kubelet" var ( - config = flag.String("config", "", "Path to the config file or directory of files") - syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config") - fileCheckFrequency = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking config files for new data") - httpCheckFrequency = flag.Duration("http_check_frequency", 20*time.Second, "Duration between checking http for new data") - manifestURL = flag.String("manifest_url", "", "URL for accessing the container manifest") - enableServer = flag.Bool("enable_server", true, "Enable the info server") - address = flag.String("address", "127.0.0.1", "The address for the info server to serve on (set to 0.0.0.0 or \"\" for all interfaces)") - port = flag.Uint("port", master.KubeletPort, "The port for the info server to serve on") - hostnameOverride = flag.String("hostname_override", "", "If non-empty, will use this string as identification instead of the actual hostname.") - dockerEndpoint = flag.String("docker_endpoint", "", "If non-empty, use this for the docker endpoint to communicate with") - etcdServerList util.StringList - rootDirectory = flag.String("root_dir", defaultRootDir, "Directory path for managing kubelet files (volume mounts,etc).") - allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow containers to request privileged mode. [default=false]") - registryPullQPS = flag.Float64("registry_qps", 0.0, "If > 0, limit registry pull QPS to this value. If 0, unlimited. [default=0.0]") - registryBurst = flag.Int("registry_burst", 10, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps. Only used if --registry_qps > 0") + config = flag.String("config", "", "Path to the config file or directory of files") + syncFrequency = flag.Duration("sync_frequency", 10*time.Second, "Max period between synchronizing running containers and config") + fileCheckFrequency = flag.Duration("file_check_frequency", 20*time.Second, "Duration between checking config files for new data") + httpCheckFrequency = flag.Duration("http_check_frequency", 20*time.Second, "Duration between checking http for new data") + manifestURL = flag.String("manifest_url", "", "URL for accessing the container manifest") + enableServer = flag.Bool("enable_server", true, "Enable the info server") + address = flag.String("address", "127.0.0.1", "The address for the info server to serve on (set to 0.0.0.0 or \"\" for all interfaces)") + port = flag.Uint("port", master.KubeletPort, "The port for the info server to serve on") + hostnameOverride = flag.String("hostname_override", "", "If non-empty, will use this string as identification instead of the actual hostname.") + networkContainerImage = flag.String("network_container_image", kubelet.NetworkContainerImage, "The image that network containers in each pod will use.") + dockerEndpoint = flag.String("docker_endpoint", "", "If non-empty, use this for the docker endpoint to communicate with") + etcdServerList util.StringList + rootDirectory = flag.String("root_dir", defaultRootDir, "Directory path for managing kubelet files (volume mounts,etc).") + allowPrivileged = flag.Bool("allow_privileged", false, "If true, allow containers to request privileged mode. [default=false]") + registryPullQPS = flag.Float64("registry_qps", 0.0, "If > 0, limit registry pull QPS to this value. If 0, unlimited. [default=0.0]") + registryBurst = flag.Int("registry_burst", 10, "Maximum size of a bursty pulls, temporarily allows pulls to burst to this number, while still not exceeding registry_qps. Only used if --registry_qps > 0") ) func init() { @@ -159,6 +160,7 @@ func main() { cadvisorClient, etcdClient, *rootDirectory, + *networkContainerImage, *syncFrequency, float32(*registryPullQPS), *registryBurst) diff --git a/pkg/kubelet/dockertools/fake_docker_client.go b/pkg/kubelet/dockertools/fake_docker_client.go index df636010010..13d0a47cb34 100644 --- a/pkg/kubelet/dockertools/fake_docker_client.go +++ b/pkg/kubelet/dockertools/fake_docker_client.go @@ -81,7 +81,7 @@ func (f *FakeDockerClient) CreateContainer(c docker.CreateContainerOptions) (*do // This is not a very good fake. We'll just add this container's name to the list. // Docker likes to add a '/', so copy that behavior. name := "/" + c.Name - f.ContainerList = append(f.ContainerList, docker.APIContainers{ID: name, Names: []string{name}}) + f.ContainerList = append(f.ContainerList, docker.APIContainers{ID: name, Names: []string{name}, Image: c.Config.Image}) return &docker.Container{ID: name}, nil } @@ -138,6 +138,7 @@ func (f *FakeDockerClient) InspectImage(name string) (*docker.Image, error) { type FakeDockerPuller struct { sync.Mutex + HasImages []string ImagesPulled []string // Every pull will return the first error here, and then reslice @@ -159,5 +160,15 @@ func (f *FakeDockerPuller) Pull(image string) (err error) { } func (f *FakeDockerPuller) IsImagePresent(name string) (bool, error) { - return true, nil + f.Lock() + defer f.Unlock() + if f.HasImages == nil { + return true, nil + } + for _, s := range f.HasImages { + if s == name { + return true, nil + } + } + return false, nil } diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index c5ed18ac84d..5a7033d082e 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -67,21 +67,23 @@ func NewMainKubelet( cc CadvisorInterface, ec tools.EtcdClient, rd string, + ni string, ri time.Duration, pullQPS float32, pullBurst int) *Kubelet { return &Kubelet{ - hostname: hn, - dockerClient: dc, - cadvisorClient: cc, - etcdClient: ec, - rootDirectory: rd, - resyncInterval: ri, - podWorkers: newPodWorkers(), - runner: dockertools.NewDockerContainerCommandRunner(), - httpClient: &http.Client{}, - pullQPS: pullQPS, - pullBurst: pullBurst, + hostname: hn, + dockerClient: dc, + cadvisorClient: cc, + etcdClient: ec, + rootDirectory: rd, + resyncInterval: ri, + networkContainerImage: ni, + podWorkers: newPodWorkers(), + runner: dockertools.NewDockerContainerCommandRunner(), + httpClient: &http.Client{}, + pullQPS: pullQPS, + pullBurst: pullBurst, } } @@ -89,11 +91,12 @@ func NewMainKubelet( // TODO: add more integration tests, and expand parameter list as needed. func NewIntegrationTestKubelet(hn string, dc dockertools.DockerInterface) *Kubelet { return &Kubelet{ - hostname: hn, - dockerClient: dc, - dockerPuller: &dockertools.FakeDockerPuller{}, - resyncInterval: 3 * time.Second, - podWorkers: newPodWorkers(), + hostname: hn, + dockerClient: dc, + dockerPuller: &dockertools.FakeDockerPuller{}, + networkContainerImage: NetworkContainerImage, + resyncInterval: 3 * time.Second, + podWorkers: newPodWorkers(), } } @@ -103,11 +106,12 @@ type httpGetInterface interface { // Kubelet is the main kubelet implementation. type Kubelet struct { - hostname string - dockerClient dockertools.DockerInterface - rootDirectory string - podWorkers podWorkers - resyncInterval time.Duration + hostname string + dockerClient dockertools.DockerInterface + rootDirectory string + networkContainerImage string + podWorkers podWorkers + resyncInterval time.Duration // Optional, no events will be sent without it etcdClient tools.EtcdClient @@ -368,7 +372,7 @@ func (kl *Kubelet) killContainerByID(ID, name string) error { const ( networkContainerName = "net" - networkContainerImage = "kubernetes/pause:latest" + NetworkContainerImage = "kubernetes/pause:latest" ) // createNetworkContainer starts the network container for a pod. Returns the docker container ID of the newly created container. @@ -381,12 +385,19 @@ func (kl *Kubelet) createNetworkContainer(pod *Pod) (dockertools.DockerID, error } container := &api.Container{ Name: networkContainerName, - Image: networkContainerImage, + Image: kl.networkContainerImage, Ports: ports, } - if err := kl.dockerPuller.Pull(networkContainerImage); err != nil { + // TODO: make this a TTL based pull (if image older than X policy, pull) + ok, err := kl.dockerPuller.IsImagePresent(container.Image) + if err != nil { return "", err } + if !ok { + if err := kl.dockerPuller.Pull(container.Image); err != nil { + return "", err + } + } return kl.runContainer(pod, container, nil, "") } diff --git a/pkg/kubelet/kubelet_test.go b/pkg/kubelet/kubelet_test.go index 5969f4f7542..0334f57d008 100644 --- a/pkg/kubelet/kubelet_test.go +++ b/pkg/kubelet/kubelet_test.go @@ -22,6 +22,7 @@ import ( "reflect" "regexp" "strconv" + "strings" "sync" "testing" "time" @@ -206,6 +207,7 @@ func matchString(t *testing.T, pattern, str string) bool { func TestSyncPodsCreatesNetAndContainer(t *testing.T) { kubelet, _, fakeDocker := newTestKubelet(t) + kubelet.networkContainerImage = "custom_image_name" fakeDocker.ContainerList = []docker.APIContainers{} err := kubelet.SyncPods([]Pod{ { @@ -228,6 +230,57 @@ func TestSyncPodsCreatesNetAndContainer(t *testing.T) { "list", "list", "create", "start", "list", "inspect", "list", "create", "start"}) fakeDocker.Lock() + + found := false + for _, c := range fakeDocker.ContainerList { + if c.Image == "custom_image_name" && strings.HasPrefix(c.Names[0], "/k8s_net") { + found = true + } + } + if !found { + t.Errorf("Custom net container not found: %v", fakeDocker.ContainerList) + } + + if len(fakeDocker.Created) != 2 || + !matchString(t, "k8s_net\\.[a-f0-9]+_foo.test_", fakeDocker.Created[0]) || + !matchString(t, "k8s_bar\\.[a-f0-9]+_foo.test_", fakeDocker.Created[1]) { + t.Errorf("Unexpected containers created %v", fakeDocker.Created) + } + fakeDocker.Unlock() +} + +func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) { + kubelet, _, fakeDocker := newTestKubelet(t) + puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller) + puller.HasImages = []string{} + kubelet.networkContainerImage = "custom_image_name" + fakeDocker.ContainerList = []docker.APIContainers{} + err := kubelet.SyncPods([]Pod{ + { + Name: "foo", + Namespace: "test", + Manifest: api.ContainerManifest{ + ID: "foo", + Containers: []api.Container{ + {Name: "bar"}, + }, + }, + }, + }) + if err != nil { + t.Errorf("unexpected error: %v", err) + } + kubelet.drainWorkers() + + verifyCalls(t, fakeDocker, []string{ + "list", "list", "create", "start", "list", "inspect", "list", "create", "start"}) + + fakeDocker.Lock() + + if !reflect.DeepEqual(puller.ImagesPulled, []string{"custom_image_name", ""}) { + t.Errorf("Unexpected pulled containers: %v", puller.ImagesPulled) + } + if len(fakeDocker.Created) != 2 || !matchString(t, "k8s_net\\.[a-f0-9]+_foo.test_", fakeDocker.Created[0]) || !matchString(t, "k8s_bar\\.[a-f0-9]+_foo.test_", fakeDocker.Created[1]) {