mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-25 12:43:23 +00:00
Merge pull request #6776 from yifan-gu/puller
kubelet/dockertools: Add puller interfaces in the containerManager.
This commit is contained in:
commit
26f8bc1a68
@ -96,8 +96,8 @@ type throttledDockerPuller struct {
|
|||||||
limiter util.RateLimiter
|
limiter util.RateLimiter
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewDockerPuller creates a new instance of the default implementation of DockerPuller.
|
// newDockerPuller creates a new instance of the default implementation of DockerPuller.
|
||||||
func NewDockerPuller(client DockerInterface, qps float32, burst int) DockerPuller {
|
func newDockerPuller(client DockerInterface, qps float32, burst int) DockerPuller {
|
||||||
dp := dockerPuller{
|
dp := dockerPuller{
|
||||||
client: client,
|
client: client,
|
||||||
keyring: credentialprovider.NewDockerKeyring(),
|
keyring: credentialprovider.NewDockerKeyring(),
|
||||||
|
@ -396,7 +396,7 @@ func TestIsImagePresent(t *testing.T) {
|
|||||||
func TestGetRunningContainers(t *testing.T) {
|
func TestGetRunningContainers(t *testing.T) {
|
||||||
fakeDocker := &FakeDockerClient{Errors: make(map[string]error)}
|
fakeDocker := &FakeDockerClient{Errors: make(map[string]error)}
|
||||||
fakeRecorder := &record.FakeRecorder{}
|
fakeRecorder := &record.FakeRecorder{}
|
||||||
containerManager := NewDockerManager(fakeDocker, fakeRecorder, PodInfraContainerImage)
|
containerManager := NewDockerManager(fakeDocker, fakeRecorder, PodInfraContainerImage, 0, 0)
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
containers map[string]*docker.Container
|
containers map[string]*docker.Container
|
||||||
inputIDs []string
|
inputIDs []string
|
||||||
|
@ -60,18 +60,25 @@ type DockerManager struct {
|
|||||||
// means that some entries may be recycled before a pod has been
|
// means that some entries may be recycled before a pod has been
|
||||||
// deleted.
|
// deleted.
|
||||||
reasonCache stringCache
|
reasonCache stringCache
|
||||||
|
// TODO(yifan): We export this for testability, so when we have a fake
|
||||||
|
// container manager, then we can unexport this. Also at that time, we
|
||||||
|
// use the concrete type so that we can record the pull failure and eliminate
|
||||||
|
// the image checking in GetPodStatus().
|
||||||
|
Puller DockerPuller
|
||||||
}
|
}
|
||||||
|
|
||||||
// Ensures DockerManager implements ConatinerRunner.
|
// Ensures DockerManager implements ConatinerRunner.
|
||||||
var _ kubecontainer.ContainerRunner = new(DockerManager)
|
var _ kubecontainer.ContainerRunner = new(DockerManager)
|
||||||
|
|
||||||
func NewDockerManager(client DockerInterface, recorder record.EventRecorder, podInfraContainerImage string) *DockerManager {
|
func NewDockerManager(client DockerInterface, recorder record.EventRecorder, podInfraContainerImage string, qps float32, burst int) *DockerManager {
|
||||||
reasonCache := stringCache{cache: lru.New(maxReasonCacheEntries)}
|
reasonCache := stringCache{cache: lru.New(maxReasonCacheEntries)}
|
||||||
return &DockerManager{
|
return &DockerManager{
|
||||||
client: client,
|
client: client,
|
||||||
recorder: recorder,
|
recorder: recorder,
|
||||||
PodInfraContainerImage: podInfraContainerImage,
|
PodInfraContainerImage: podInfraContainerImage,
|
||||||
reasonCache: reasonCache}
|
reasonCache: reasonCache,
|
||||||
|
Puller: newDockerPuller(client, qps, burst),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// A cache which stores strings keyed by <pod_UID>_<container_name>.
|
// A cache which stores strings keyed by <pod_UID>_<container_name>.
|
||||||
@ -569,3 +576,11 @@ func (self *DockerManager) GetPods(all bool) ([]*kubecontainer.Pod, error) {
|
|||||||
}
|
}
|
||||||
return result, nil
|
return result, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (self *DockerManager) Pull(image string) error {
|
||||||
|
return self.Puller.Pull(image)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (self *DockerManager) IsImagePresent(image string) (bool, error) {
|
||||||
|
return self.Puller.IsImagePresent(image)
|
||||||
|
}
|
||||||
|
@ -199,7 +199,7 @@ func NewMainKubelet(
|
|||||||
return nil, fmt.Errorf("failed to initialize image manager: %v", err)
|
return nil, fmt.Errorf("failed to initialize image manager: %v", err)
|
||||||
}
|
}
|
||||||
statusManager := newStatusManager(kubeClient)
|
statusManager := newStatusManager(kubeClient)
|
||||||
containerManager := dockertools.NewDockerManager(dockerClient, recorder, podInfraContainerImage)
|
containerManager := dockertools.NewDockerManager(dockerClient, recorder, podInfraContainerImage, pullQPS, pullBurst)
|
||||||
|
|
||||||
klet := &Kubelet{
|
klet := &Kubelet{
|
||||||
hostname: hostname,
|
hostname: hostname,
|
||||||
@ -211,8 +211,6 @@ func NewMainKubelet(
|
|||||||
readinessManager: kubecontainer.NewReadinessManager(),
|
readinessManager: kubecontainer.NewReadinessManager(),
|
||||||
runner: dockertools.NewDockerContainerCommandRunner(dockerClient),
|
runner: dockertools.NewDockerContainerCommandRunner(dockerClient),
|
||||||
httpClient: &http.Client{},
|
httpClient: &http.Client{},
|
||||||
pullQPS: pullQPS,
|
|
||||||
pullBurst: pullBurst,
|
|
||||||
sourcesReady: sourcesReady,
|
sourcesReady: sourcesReady,
|
||||||
clusterDomain: clusterDomain,
|
clusterDomain: clusterDomain,
|
||||||
clusterDNS: clusterDNS,
|
clusterDNS: clusterDNS,
|
||||||
@ -289,18 +287,12 @@ type Kubelet struct {
|
|||||||
// Tracks references for reporting events
|
// Tracks references for reporting events
|
||||||
containerRefManager *kubecontainer.RefManager
|
containerRefManager *kubecontainer.RefManager
|
||||||
|
|
||||||
// Optional, defaults to simple Docker implementation
|
|
||||||
dockerPuller dockertools.DockerPuller
|
|
||||||
// Optional, defaults to /logs/ from /var/log
|
// Optional, defaults to /logs/ from /var/log
|
||||||
logServer http.Handler
|
logServer http.Handler
|
||||||
// Optional, defaults to simple Docker implementation
|
// Optional, defaults to simple Docker implementation
|
||||||
runner dockertools.ContainerCommandRunner
|
runner dockertools.ContainerCommandRunner
|
||||||
// Optional, client for http requests, defaults to empty client
|
// Optional, client for http requests, defaults to empty client
|
||||||
httpClient httpGetter
|
httpClient httpGetter
|
||||||
// Optional, maximum pull QPS from the docker registry, 0.0 means unlimited.
|
|
||||||
pullQPS float32
|
|
||||||
// Optional, maximum burst QPS from the docker registry, must be positive if QPS is > 0.0
|
|
||||||
pullBurst int
|
|
||||||
|
|
||||||
// cAdvisor used for container information.
|
// cAdvisor used for container information.
|
||||||
cadvisor cadvisor.Interface
|
cadvisor cadvisor.Interface
|
||||||
@ -541,9 +533,6 @@ func (kl *Kubelet) Run(updates <-chan PodUpdate) {
|
|||||||
if kl.logServer == nil {
|
if kl.logServer == nil {
|
||||||
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
|
kl.logServer = http.StripPrefix("/logs/", http.FileServer(http.Dir("/var/log/")))
|
||||||
}
|
}
|
||||||
if kl.dockerPuller == nil {
|
|
||||||
kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst)
|
|
||||||
}
|
|
||||||
if kl.kubeClient == nil {
|
if kl.kubeClient == nil {
|
||||||
glog.Warning("No api server defined - no node status update will be sent.")
|
glog.Warning("No api server defined - no node status update will be sent.")
|
||||||
}
|
}
|
||||||
@ -877,7 +866,7 @@ func (kl *Kubelet) createPodInfraContainer(pod *api.Pod) (dockertools.DockerID,
|
|||||||
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
|
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
|
||||||
}
|
}
|
||||||
// TODO: make this a TTL based pull (if image older than X policy, pull)
|
// TODO: make this a TTL based pull (if image older than X policy, pull)
|
||||||
ok, err := kl.dockerPuller.IsImagePresent(container.Image)
|
ok, err := kl.containerManager.IsImagePresent(container.Image)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if ref != nil {
|
if ref != nil {
|
||||||
kl.recorder.Eventf(ref, "failed", "Failed to inspect image %q: %v", container.Image, err)
|
kl.recorder.Eventf(ref, "failed", "Failed to inspect image %q: %v", container.Image, err)
|
||||||
@ -919,7 +908,7 @@ func (kl *Kubelet) pullImage(img string, ref *api.ObjectReference) error {
|
|||||||
metrics.ImagePullLatency.Observe(metrics.SinceInMicroseconds(start))
|
metrics.ImagePullLatency.Observe(metrics.SinceInMicroseconds(start))
|
||||||
}()
|
}()
|
||||||
|
|
||||||
if err := kl.dockerPuller.Pull(img); err != nil {
|
if err := kl.containerManager.Pull(img); err != nil {
|
||||||
if ref != nil {
|
if ref != nil {
|
||||||
kl.recorder.Eventf(ref, "failed", "Failed to pull image %q: %v", img, err)
|
kl.recorder.Eventf(ref, "failed", "Failed to pull image %q: %v", img, err)
|
||||||
}
|
}
|
||||||
@ -1033,7 +1022,7 @@ func (kl *Kubelet) pullImageAndRunContainer(pod *api.Pod, container *api.Contain
|
|||||||
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
|
glog.Errorf("Couldn't make a ref to pod %v, container %v: '%v'", pod.Name, container.Name, err)
|
||||||
}
|
}
|
||||||
if container.ImagePullPolicy != api.PullNever {
|
if container.ImagePullPolicy != api.PullNever {
|
||||||
present, err := kl.dockerPuller.IsImagePresent(container.Image)
|
present, err := kl.containerManager.IsImagePresent(container.Image)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if ref != nil {
|
if ref != nil {
|
||||||
kl.recorder.Eventf(ref, "failed", "Failed to inspect image %q: %v", container.Image, err)
|
kl.recorder.Eventf(ref, "failed", "Failed to inspect image %q: %v", container.Image, err)
|
||||||
|
@ -73,11 +73,10 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
|||||||
|
|
||||||
fakeRecorder := &record.FakeRecorder{}
|
fakeRecorder := &record.FakeRecorder{}
|
||||||
fakeKubeClient := &testclient.Fake{}
|
fakeKubeClient := &testclient.Fake{}
|
||||||
|
|
||||||
kubelet := &Kubelet{}
|
kubelet := &Kubelet{}
|
||||||
kubelet.dockerClient = fakeDocker
|
kubelet.dockerClient = fakeDocker
|
||||||
kubelet.kubeClient = fakeKubeClient
|
kubelet.kubeClient = fakeKubeClient
|
||||||
kubelet.dockerPuller = &dockertools.FakeDockerPuller{}
|
|
||||||
kubelet.hostname = "testnode"
|
kubelet.hostname = "testnode"
|
||||||
kubelet.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
|
kubelet.networkPlugin, _ = network.InitNetworkPlugin([]network.NetworkPlugin{}, "", network.NewFakeHost(nil))
|
||||||
if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil {
|
if tempDir, err := ioutil.TempDir("/tmp", "kubelet_test."); err != nil {
|
||||||
@ -104,7 +103,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
|||||||
podManager, fakeMirrorClient := newFakePodManager()
|
podManager, fakeMirrorClient := newFakePodManager()
|
||||||
kubelet.podManager = podManager
|
kubelet.podManager = podManager
|
||||||
kubelet.containerRefManager = kubecontainer.NewRefManager()
|
kubelet.containerRefManager = kubecontainer.NewRefManager()
|
||||||
kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage)
|
kubelet.containerManager = dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage, 0, 0)
|
||||||
kubelet.dockerCache = dockertools.NewFakeDockerCache(kubelet.containerManager)
|
kubelet.dockerCache = dockertools.NewFakeDockerCache(kubelet.containerManager)
|
||||||
kubelet.podWorkers = newPodWorkers(
|
kubelet.podWorkers = newPodWorkers(
|
||||||
kubelet.dockerCache,
|
kubelet.dockerCache,
|
||||||
@ -114,6 +113,7 @@ func newTestKubelet(t *testing.T) *TestKubelet {
|
|||||||
return err
|
return err
|
||||||
},
|
},
|
||||||
fakeRecorder)
|
fakeRecorder)
|
||||||
|
kubelet.containerManager.Puller = &dockertools.FakeDockerPuller{}
|
||||||
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient}
|
return &TestKubelet{kubelet, fakeDocker, mockCadvisor, fakeKubeClient, waitGroup, fakeMirrorClient}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -593,7 +593,7 @@ func TestSyncPodsCreatesNetAndContainerPullsImage(t *testing.T) {
|
|||||||
kubelet := testKubelet.kubelet
|
kubelet := testKubelet.kubelet
|
||||||
fakeDocker := testKubelet.fakeDocker
|
fakeDocker := testKubelet.fakeDocker
|
||||||
waitGroup := testKubelet.waitGroup
|
waitGroup := testKubelet.waitGroup
|
||||||
puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller)
|
puller := kubelet.containerManager.Puller.(*dockertools.FakeDockerPuller)
|
||||||
puller.HasImages = []string{}
|
puller.HasImages = []string{}
|
||||||
kubelet.containerManager.PodInfraContainerImage = "custom_image_name"
|
kubelet.containerManager.PodInfraContainerImage = "custom_image_name"
|
||||||
fakeDocker.ContainerList = []docker.APIContainers{}
|
fakeDocker.ContainerList = []docker.APIContainers{}
|
||||||
@ -1249,7 +1249,6 @@ func TestGetRootInfo(t *testing.T) {
|
|||||||
|
|
||||||
kubelet := Kubelet{
|
kubelet := Kubelet{
|
||||||
dockerClient: &fakeDocker,
|
dockerClient: &fakeDocker,
|
||||||
dockerPuller: &dockertools.FakeDockerPuller{},
|
|
||||||
cadvisor: mockCadvisor,
|
cadvisor: mockCadvisor,
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -1652,7 +1651,7 @@ func TestSyncPodsWithPullPolicy(t *testing.T) {
|
|||||||
kubelet := testKubelet.kubelet
|
kubelet := testKubelet.kubelet
|
||||||
fakeDocker := testKubelet.fakeDocker
|
fakeDocker := testKubelet.fakeDocker
|
||||||
waitGroup := testKubelet.waitGroup
|
waitGroup := testKubelet.waitGroup
|
||||||
puller := kubelet.dockerPuller.(*dockertools.FakeDockerPuller)
|
puller := kubelet.containerManager.Puller.(*dockertools.FakeDockerPuller)
|
||||||
puller.HasImages = []string{"existing_one", "want:latest"}
|
puller.HasImages = []string{"existing_one", "want:latest"}
|
||||||
kubelet.containerManager.PodInfraContainerImage = "custom_image_name"
|
kubelet.containerManager.PodInfraContainerImage = "custom_image_name"
|
||||||
fakeDocker.ContainerList = []docker.APIContainers{}
|
fakeDocker.ContainerList = []docker.APIContainers{}
|
||||||
|
@ -40,7 +40,7 @@ func newPod(uid, name string) *api.Pod {
|
|||||||
func createPodWorkers() (*podWorkers, map[types.UID][]string) {
|
func createPodWorkers() (*podWorkers, map[types.UID][]string) {
|
||||||
fakeDocker := &dockertools.FakeDockerClient{}
|
fakeDocker := &dockertools.FakeDockerClient{}
|
||||||
fakeRecorder := &record.FakeRecorder{}
|
fakeRecorder := &record.FakeRecorder{}
|
||||||
fakeDockerCache := dockertools.NewFakeDockerCache(dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage))
|
fakeDockerCache := dockertools.NewFakeDockerCache(dockertools.NewDockerManager(fakeDocker, fakeRecorder, dockertools.PodInfraContainerImage, 0, 0))
|
||||||
|
|
||||||
lock := sync.Mutex{}
|
lock := sync.Mutex{}
|
||||||
processed := make(map[types.UID][]string)
|
processed := make(map[types.UID][]string)
|
||||||
|
@ -22,7 +22,6 @@ import (
|
|||||||
|
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
|
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/container"
|
||||||
"github.com/GoogleCloudPlatform/kubernetes/pkg/kubelet/dockertools"
|
|
||||||
"github.com/golang/glog"
|
"github.com/golang/glog"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -53,9 +52,6 @@ func (kl *Kubelet) RunOnce(updates <-chan PodUpdate) ([]RunPodResult, error) {
|
|||||||
|
|
||||||
// runOnce runs a given set of pods and returns their status.
|
// runOnce runs a given set of pods and returns their status.
|
||||||
func (kl *Kubelet) runOnce(pods []api.Pod, retryDelay time.Duration) (results []RunPodResult, err error) {
|
func (kl *Kubelet) runOnce(pods []api.Pod, retryDelay time.Duration) (results []RunPodResult, err error) {
|
||||||
if kl.dockerPuller == nil {
|
|
||||||
kl.dockerPuller = dockertools.NewDockerPuller(kl.dockerClient, kl.pullQPS, kl.pullBurst)
|
|
||||||
}
|
|
||||||
kl.handleNotFittingPods(pods)
|
kl.handleNotFittingPods(pods)
|
||||||
|
|
||||||
ch := make(chan RunPodResult)
|
ch := make(chan RunPodResult)
|
||||||
|
@ -144,8 +144,9 @@ func TestRunOnce(t *testing.T) {
|
|||||||
},
|
},
|
||||||
t: t,
|
t: t,
|
||||||
}
|
}
|
||||||
kb.dockerPuller = &dockertools.FakeDockerPuller{}
|
|
||||||
kb.containerManager = dockertools.NewDockerManager(kb.dockerClient, kb.recorder, dockertools.PodInfraContainerImage)
|
kb.containerManager = dockertools.NewDockerManager(kb.dockerClient, kb.recorder, dockertools.PodInfraContainerImage, 0, 0)
|
||||||
|
kb.containerManager.Puller = &dockertools.FakeDockerPuller{}
|
||||||
|
|
||||||
pods := []api.Pod{
|
pods := []api.Pod{
|
||||||
{
|
{
|
||||||
|
Loading…
Reference in New Issue
Block a user