From 4cedad1a9fb247bffd2c69f29bab0832fc010af0 Mon Sep 17 00:00:00 2001 From: wu8685 Date: Fri, 23 Sep 2016 09:51:12 +0800 Subject: [PATCH] fix issue #27137: kubelet detects pod manifest files in the directory using inotify --- pkg/kubelet/config/file.go | 81 +++-- pkg/kubelet/config/file_linux.go | 124 +++++++ pkg/kubelet/config/file_linux_test.go | 429 +++++++++++++++++++++++++ pkg/kubelet/config/file_test.go | 197 ------------ pkg/kubelet/config/file_unsupported.go | 26 ++ 5 files changed, 639 insertions(+), 218 deletions(-) create mode 100644 pkg/kubelet/config/file_linux.go create mode 100644 pkg/kubelet/config/file_linux_test.go delete mode 100644 pkg/kubelet/config/file_test.go create mode 100644 pkg/kubelet/config/file_unsupported.go diff --git a/pkg/kubelet/config/file.go b/pkg/kubelet/config/file.go index c701e767675..262f360e636 100644 --- a/pkg/kubelet/config/file.go +++ b/pkg/kubelet/config/file.go @@ -1,5 +1,5 @@ /* -Copyright 2014 The Kubernetes Authors. +Copyright 2016 The Kubernetes Authors. Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with the License. @@ -25,33 +25,50 @@ import ( "sort" "time" - "k8s.io/kubernetes/pkg/api" - kubetypes "k8s.io/kubernetes/pkg/kubelet/types" - "k8s.io/kubernetes/pkg/util/wait" - "github.com/golang/glog" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/client/cache" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" "k8s.io/kubernetes/pkg/types" + "k8s.io/kubernetes/pkg/util/wait" ) type sourceFile struct { - path string - nodeName types.NodeName - updates chan<- interface{} + path string + nodeName types.NodeName + store cache.Store + fileKeyMapping map[string]string + updates chan<- interface{} } func NewSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) { - config := &sourceFile{ - path: path, - nodeName: nodeName, - updates: updates, - } + config := new(path, nodeName, period, updates) glog.V(1).Infof("Watching path %q", path) - go wait.Until(config.run, period, wait.NeverStop) + go wait.Forever(config.run, period) +} + +func new(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) *sourceFile { + send := func(objs []interface{}) { + var pods []*api.Pod + for _, o := range objs { + pods = append(pods, o.(*api.Pod)) + } + updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource} + } + store := cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc) + return &sourceFile{ + path: path, + nodeName: nodeName, + store: store, + fileKeyMapping: map[string]string{}, + updates: updates, + } } func (s *sourceFile) run() { - if err := s.extractFromPath(); err != nil { - glog.Errorf("Unable to read config path %q: %v", s.path, err) + if err := s.watch(); err != nil { + glog.Errorf("unable to read config path %q: %v", s.path, err) } } @@ -59,7 +76,7 @@ func (s *sourceFile) applyDefaults(pod *api.Pod, source string) error { return applyDefaults(pod, source, true, s.nodeName) } -func (s *sourceFile) extractFromPath() error { +func (s *sourceFile) resetStoreFromPath() error { path := s.path statInfo, err := os.Stat(path) if err != nil { @@ -77,20 +94,23 @@ func (s *sourceFile) extractFromPath() error { if err != nil { return err } - s.updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource} + if len(pods) == 0 { + // Emit an update with an empty PodList to allow FileSource to be marked as seen + s.updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource} + return nil + } + return s.replaceStore(pods...) case statInfo.Mode().IsRegular(): pod, err := s.extractFromFile(path) if err != nil { return err } - s.updates <- kubetypes.PodUpdate{Pods: []*api.Pod{pod}, Op: kubetypes.SET, Source: kubetypes.FileSource} + return s.replaceStore(pod) default: return fmt.Errorf("path is not a directory or file") } - - return nil } // Get as many pod configs as we can from a directory. Return an error if and only if something @@ -134,6 +154,17 @@ func (s *sourceFile) extractFromDir(name string) ([]*api.Pod, error) { func (s *sourceFile) extractFromFile(filename string) (pod *api.Pod, err error) { glog.V(3).Infof("Reading config file %q", filename) + defer func() { + if err == nil && pod != nil { + objKey, keyErr := cache.MetaNamespaceKeyFunc(pod) + if keyErr != nil { + err = keyErr + return + } + s.fileKeyMapping[filename] = objKey + } + }() + file, err := os.Open(filename) if err != nil { return pod, err @@ -160,3 +191,11 @@ func (s *sourceFile) extractFromFile(filename string) (pod *api.Pod, err error) return pod, fmt.Errorf("%v: read '%v', but couldn't parse as pod(%v).\n", filename, string(data), podErr) } + +func (s *sourceFile) replaceStore(pods ...*api.Pod) (err error) { + objs := []interface{}{} + for _, pod := range pods { + objs = append(objs, pod) + } + return s.store.Replace(objs, "") +} diff --git a/pkg/kubelet/config/file_linux.go b/pkg/kubelet/config/file_linux.go new file mode 100644 index 00000000000..0b936d66850 --- /dev/null +++ b/pkg/kubelet/config/file_linux.go @@ -0,0 +1,124 @@ +// +build linux + +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Reads the pod configuration from file or a directory of files. +package config + +import ( + "fmt" + "os" + + "github.com/golang/glog" + "golang.org/x/exp/inotify" + + "k8s.io/kubernetes/pkg/api" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" +) + +type podEventType int + +const ( + podAdd podEventType = iota + podModify + podDelete +) + +func (s *sourceFile) watch() error { + _, err := os.Stat(s.path) + if err != nil { + if !os.IsNotExist(err) { + return err + } + // Emit an update with an empty PodList to allow FileSource to be marked as seen + s.updates <- kubetypes.PodUpdate{Pods: []*api.Pod{}, Op: kubetypes.SET, Source: kubetypes.FileSource} + return fmt.Errorf("path does not exist, ignoring") + } + + w, err := inotify.NewWatcher() + if err != nil { + return fmt.Errorf("unable to create inotify: %v", err) + } + defer w.Close() + + err = w.AddWatch(s.path, inotify.IN_DELETE_SELF|inotify.IN_CREATE|inotify.IN_MOVED_TO|inotify.IN_MODIFY|inotify.IN_MOVED_FROM|inotify.IN_DELETE) + if err != nil { + return fmt.Errorf("unable to create inotify for path %q: %v", s.path, err) + } + + // Reset store with config files already existing when starting + if err := s.resetStoreFromPath(); err != nil { + return fmt.Errorf("unable to read config path %q: %v", s.path, err) + } + + for { + select { + case event := <-w.Event: + err = s.processEvent(event) + if err != nil { + return fmt.Errorf("error while processing event (%+v): %v", event, err) + } + case err = <-w.Error: + return fmt.Errorf("error while watching %q: %v", s.path, err) + } + } +} + +func (s *sourceFile) processEvent(e *inotify.Event) error { + var eventType podEventType + switch { + case (e.Mask & inotify.IN_ISDIR) > 0: + glog.V(1).Infof("Not recursing into config path %q", s.path) + return nil + case (e.Mask & inotify.IN_CREATE) > 0: + eventType = podAdd + case (e.Mask & inotify.IN_MOVED_TO) > 0: + eventType = podAdd + case (e.Mask & inotify.IN_MODIFY) > 0: + eventType = podModify + case (e.Mask & inotify.IN_DELETE) > 0: + eventType = podDelete + case (e.Mask & inotify.IN_MOVED_FROM) > 0: + eventType = podDelete + case (e.Mask & inotify.IN_DELETE_SELF) > 0: + return fmt.Errorf("the watched path is deleted") + default: + // Ignore rest events + return nil + } + + switch eventType { + case podAdd, podModify: + if pod, err := s.extractFromFile(e.Name); err != nil { + glog.Errorf("can't process config file %q: %v", e.Name, err) + } else { + return s.store.Add(pod) + } + case podDelete: + if objKey, keyExist := s.fileKeyMapping[e.Name]; keyExist { + pod, podExist, err := s.store.GetByKey(objKey) + if err != nil { + return err + } else if !podExist { + return fmt.Errorf("the pod with key %s doesn't exist in cache", objKey) + } else { + return s.store.Delete(pod) + } + } + } + return nil +} diff --git a/pkg/kubelet/config/file_linux_test.go b/pkg/kubelet/config/file_linux_test.go new file mode 100644 index 00000000000..0ef0e43d47e --- /dev/null +++ b/pkg/kubelet/config/file_linux_test.go @@ -0,0 +1,429 @@ +// +build linux + +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "fmt" + "io" + "io/ioutil" + "os" + "os/exec" + "path/filepath" + "sync" + "testing" + "time" + + "k8s.io/kubernetes/pkg/api" + "k8s.io/kubernetes/pkg/api/testapi" + "k8s.io/kubernetes/pkg/api/unversioned" + "k8s.io/kubernetes/pkg/api/validation" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" + "k8s.io/kubernetes/pkg/runtime" + "k8s.io/kubernetes/pkg/securitycontext" + "k8s.io/kubernetes/pkg/types" + utiltesting "k8s.io/kubernetes/pkg/util/testing" + "k8s.io/kubernetes/pkg/util/wait" +) + +func TestExtractFromNonExistentFile(t *testing.T) { + ch := make(chan interface{}, 1) + c := new("/some/fake/file", "localhost", time.Millisecond, ch) + err := c.watch() + if err == nil { + t.Errorf("Expected error") + } +} + +func TestUpdateOnNonExistentFile(t *testing.T) { + ch := make(chan interface{}) + NewSourceFile("random_non_existent_path", "localhost", time.Millisecond, ch) + select { + case got := <-ch: + update := got.(kubetypes.PodUpdate) + expected := CreatePodUpdate(kubetypes.SET, kubetypes.FileSource) + if !api.Semantic.DeepDerivative(expected, update) { + t.Fatalf("expected %#v, Got %#v", expected, update) + } + + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("expected update, timeout instead") + } +} + +func TestReadPodsFromFileExistAlready(t *testing.T) { + hostname := types.NodeName("random-test-hostname") + var testCases = getTestCases(hostname) + + for _, testCase := range testCases { + func() { + dirName, err := utiltesting.MkTmpdir("file-test") + if err != nil { + t.Fatalf("unable to create temp dir: %v", err) + } + defer os.RemoveAll(dirName) + file := testCase.writeToFile(dirName, "test_pod_config", t) + + ch := make(chan interface{}) + NewSourceFile(file, hostname, time.Millisecond, ch) + select { + case got := <-ch: + update := got.(kubetypes.PodUpdate) + for _, pod := range update.Pods { + if errs := validation.ValidatePod(pod); len(errs) > 0 { + t.Fatalf("%s: Invalid pod %#v, %#v", testCase.desc, pod, errs) + } + } + if !api.Semantic.DeepEqual(testCase.expected, update) { + t.Fatalf("%s: Expected %#v, Got %#v", testCase.desc, testCase.expected, update) + } + case <-time.After(wait.ForeverTestTimeout): + t.Fatalf("%s: Expected update, timeout instead", testCase.desc) + } + }() + } +} + +func TestReadPodsFromFileExistLater(t *testing.T) { + watchFileAdded(false, t) +} + +func TestReadPodsFromFileChanged(t *testing.T) { + watchFileChanged(false, t) +} + +func TestReadPodsFromFileInDirAdded(t *testing.T) { + watchFileAdded(true, t) +} + +func TestReadPodsFromFileInDirChanged(t *testing.T) { + watchFileChanged(true, t) +} + +func TestExtractFromBadDataFile(t *testing.T) { + dirName, err := utiltesting.MkTmpdir("file-test") + if err != nil { + t.Fatalf("unable to create temp dir: %v", err) + } + defer os.RemoveAll(dirName) + + fileName := filepath.Join(dirName, "test_pod_config") + err = ioutil.WriteFile(fileName, []byte{1, 2, 3}, 0555) + if err != nil { + t.Fatalf("unable to write test file %#v", err) + } + + ch := make(chan interface{}, 1) + c := new(fileName, "localhost", time.Millisecond, ch) + err = c.resetStoreFromPath() + if err == nil { + t.Fatalf("expected error, got nil") + } + expectEmptyChannel(t, ch) +} + +func TestExtractFromEmptyDir(t *testing.T) { + dirName, err := utiltesting.MkTmpdir("file-test") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer os.RemoveAll(dirName) + + ch := make(chan interface{}, 1) + c := new(dirName, "localhost", time.Millisecond, ch) + err = c.resetStoreFromPath() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + update := (<-ch).(kubetypes.PodUpdate) + expected := CreatePodUpdate(kubetypes.SET, kubetypes.FileSource) + if !api.Semantic.DeepEqual(expected, update) { + t.Fatalf("expected %#v, Got %#v", expected, update) + } +} + +type testCase struct { + desc string + pod runtime.Object + expected kubetypes.PodUpdate +} + +func getTestCases(hostname types.NodeName) []*testCase { + grace := int64(30) + return []*testCase{ + { + desc: "Simple pod", + pod: &api.Pod{ + TypeMeta: unversioned.TypeMeta{ + Kind: "Pod", + APIVersion: "", + }, + ObjectMeta: api.ObjectMeta{ + Name: "test", + UID: "12345", + Namespace: "mynamespace", + }, + Spec: api.PodSpec{ + Containers: []api.Container{{Name: "image", Image: "test/image", SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults()}}, + SecurityContext: &api.PodSecurityContext{}, + }, + Status: api.PodStatus{ + Phase: api.PodPending, + }, + }, + expected: CreatePodUpdate(kubetypes.SET, kubetypes.FileSource, &api.Pod{ + ObjectMeta: api.ObjectMeta{ + Name: "test-" + string(hostname), + UID: "12345", + Namespace: "mynamespace", + Annotations: map[string]string{kubetypes.ConfigHashAnnotationKey: "12345"}, + SelfLink: getSelfLink("test-"+string(hostname), "mynamespace"), + }, + Spec: api.PodSpec{ + NodeName: string(hostname), + RestartPolicy: api.RestartPolicyAlways, + DNSPolicy: api.DNSClusterFirst, + TerminationGracePeriodSeconds: &grace, + Containers: []api.Container{{ + Name: "image", + Image: "test/image", + TerminationMessagePath: "/dev/termination-log", + ImagePullPolicy: "Always", + SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults()}}, + SecurityContext: &api.PodSecurityContext{}, + }, + Status: api.PodStatus{ + Phase: api.PodPending, + }, + }), + }, + } +} + +func (tc *testCase) writeToFile(dir, name string, t *testing.T) string { + var versionedPod runtime.Object + err := testapi.Default.Converter().Convert(&tc.pod, &versionedPod, nil) + if err != nil { + t.Fatalf("%s: error in versioning the pod: %v", tc.desc, err) + } + fileContents, err := runtime.Encode(testapi.Default.Codec(), versionedPod) + if err != nil { + t.Fatalf("%s: error in encoding the pod: %v", tc.desc, err) + } + + fileName := filepath.Join(dir, name) + if err := writeFile(fileName, []byte(fileContents)); err != nil { + t.Fatalf("unable to write test file %#v", err) + } + return fileName +} + +func watchFileAdded(watchDir bool, t *testing.T) { + hostname := types.NodeName("random-test-hostname") + var testCases = getTestCases(hostname) + + fileNamePre := "test_pod_config" + for index, testCase := range testCases { + func() { + dirName, err := utiltesting.MkTmpdir("dir-test") + if err != nil { + t.Fatalf("unable to create temp dir: %v", err) + } + defer os.RemoveAll(dirName) + fileName := fmt.Sprintf("%s_%d", fileNamePre, index) + + ch := make(chan interface{}) + if watchDir { + NewSourceFile(dirName, hostname, 100*time.Millisecond, ch) + } else { + NewSourceFile(filepath.Join(dirName, fileName), hostname, 100*time.Millisecond, ch) + } + expectEmptyUpdate(t, ch) + + addFile := func() { + // Add a file + testCase.writeToFile(dirName, fileName, t) + } + + if watchDir { + defer func() { + // Remove the file + deleteFile(dirName, fileName, ch, t) + }() + } + + go addFile() + + if watchDir { + // expect an update by CREATE inotify event + expectUpdate(t, ch, testCase) + // expect an update by MODIFY inotify event + expectUpdate(t, ch, testCase) + + from := fileName + fileName = fileName + "_ch" + go changeFileName(dirName, from, fileName, t) + // expect an update by MOVED_FROM inotify event cause changing file name + expectEmptyUpdate(t, ch) + // expect an update by MOVED_TO inotify event cause changing file name + expectUpdate(t, ch, testCase) + } else { + // expect an update by SourceFile.resetStoreFromPath() + expectUpdate(t, ch, testCase) + } + }() + } +} + +func watchFileChanged(watchDir bool, t *testing.T) { + hostname := types.NodeName("random-test-hostname") + var testCases = getTestCases(hostname) + + fileNamePre := "test_pod_config" + for index, testCase := range testCases { + func() { + dirName, err := utiltesting.MkTmpdir("dir-test") + fileName := fmt.Sprintf("%s_%d", fileNamePre, index) + if err != nil { + t.Fatalf("unable to create temp dir: %v", err) + } + defer os.RemoveAll(dirName) + + var file string + lock := &sync.Mutex{} + ch := make(chan interface{}) + func() { + lock.Lock() + defer lock.Unlock() + file = testCase.writeToFile(dirName, fileName, t) + }() + + if watchDir { + NewSourceFile(dirName, hostname, 100*time.Millisecond, ch) + defer func() { + // Remove the file + deleteFile(dirName, fileName, ch, t) + }() + } else { + NewSourceFile(file, hostname, 100*time.Millisecond, ch) + } + // expect an update by SourceFile.resetStoreFromPath() + expectUpdate(t, ch, testCase) + + changeFile := func() { + // Edit the file content + lock.Lock() + defer lock.Unlock() + + pod := testCase.pod.(*api.Pod) + pod.Spec.Containers[0].Name = "image2" + + testCase.expected.Pods[0].Spec.Containers[0].Name = "image2" + testCase.writeToFile(dirName, fileName, t) + } + + go changeFile() + // expect an update by MODIFY inotify event + expectUpdate(t, ch, testCase) + + if watchDir { + from := fileName + fileName = fileName + "_ch" + go changeFileName(dirName, from, fileName, t) + // expect an update by MOVED_FROM inotify event cause changing file name + expectEmptyUpdate(t, ch) + // expect an update by MOVED_TO inotify event cause changing file name + expectUpdate(t, ch, testCase) + } + }() + } +} + +func deleteFile(dir, file string, ch chan interface{}, t *testing.T) { + go func() { + path := filepath.Join(dir, file) + err := os.Remove(path) + if err != nil { + t.Errorf("unable to remove test file %s: %s", path, err) + } + }() + + expectEmptyUpdate(t, ch) +} + +func expectUpdate(t *testing.T, ch chan interface{}, testCase *testCase) { + timer := time.After(5 * time.Second) + for { + select { + case got := <-ch: + update := got.(kubetypes.PodUpdate) + for _, pod := range update.Pods { + if errs := validation.ValidatePod(pod); len(errs) > 0 { + t.Fatalf("%s: Invalid pod %#v, %#v", testCase.desc, pod, errs) + } + } + + if !api.Semantic.DeepEqual(testCase.expected, update) { + t.Fatalf("%s: Expected: %#v, Got: %#v", testCase.desc, testCase.expected, update) + } + return + case <-timer: + t.Fatalf("%s: Expected update, timeout instead", testCase.desc) + } + } +} + +func expectEmptyUpdate(t *testing.T, ch chan interface{}) { + timer := time.After(5 * time.Second) + for { + select { + case got := <-ch: + update := got.(kubetypes.PodUpdate) + if len(update.Pods) != 0 { + t.Fatalf("expected empty update, got %#v", update) + } + return + case <-timer: + t.Fatalf("expected empty update, timeout instead") + } + } +} + +func writeFile(filename string, data []byte) error { + f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE, 0666) + if err != nil { + return err + } + n, err := f.Write(data) + if err == nil && n < len(data) { + err = io.ErrShortWrite + } + if err1 := f.Close(); err == nil { + err = err1 + } + return err +} + +func changeFileName(dir, from, to string, t *testing.T) { + fromPath := filepath.Join(dir, from) + toPath := filepath.Join(dir, to) + if err := exec.Command("mv", fromPath, toPath).Run(); err != nil { + t.Errorf("Fail to change file name: %s", err) + } +} diff --git a/pkg/kubelet/config/file_test.go b/pkg/kubelet/config/file_test.go deleted file mode 100644 index 6dcc5141c66..00000000000 --- a/pkg/kubelet/config/file_test.go +++ /dev/null @@ -1,197 +0,0 @@ -/* -Copyright 2014 The Kubernetes Authors. - -Licensed under the Apache License, Version 2.0 (the "License"); -you may not use this file except in compliance with the License. -You may obtain a copy of the License at - - http://www.apache.org/licenses/LICENSE-2.0 - -Unless required by applicable law or agreed to in writing, software -distributed under the License is distributed on an "AS IS" BASIS, -WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -See the License for the specific language governing permissions and -limitations under the License. -*/ - -package config - -import ( - "io/ioutil" - "os" - "testing" - "time" - - "k8s.io/kubernetes/pkg/api" - "k8s.io/kubernetes/pkg/api/testapi" - "k8s.io/kubernetes/pkg/api/unversioned" - "k8s.io/kubernetes/pkg/api/validation" - kubetypes "k8s.io/kubernetes/pkg/kubelet/types" - "k8s.io/kubernetes/pkg/runtime" - "k8s.io/kubernetes/pkg/securitycontext" - "k8s.io/kubernetes/pkg/types" - utiltesting "k8s.io/kubernetes/pkg/util/testing" - "k8s.io/kubernetes/pkg/util/wait" -) - -func TestExtractFromNonExistentFile(t *testing.T) { - ch := make(chan interface{}, 1) - c := sourceFile{"/some/fake/file", "localhost", ch} - err := c.extractFromPath() - if err == nil { - t.Errorf("Expected error") - } -} - -func TestUpdateOnNonExistentFile(t *testing.T) { - ch := make(chan interface{}) - NewSourceFile("random_non_existent_path", "localhost", time.Millisecond, ch) - select { - case got := <-ch: - update := got.(kubetypes.PodUpdate) - expected := CreatePodUpdate(kubetypes.SET, kubetypes.FileSource) - if !api.Semantic.DeepDerivative(expected, update) { - t.Fatalf("Expected %#v, Got %#v", expected, update) - } - - case <-time.After(wait.ForeverTestTimeout): - t.Errorf("Expected update, timeout instead") - } -} - -func writeTestFile(t *testing.T, dir, name string, contents string) *os.File { - file, err := ioutil.TempFile(os.TempDir(), "test_pod_config") - if err != nil { - t.Fatalf("Unable to create test file %#v", err) - } - file.Close() - if err := ioutil.WriteFile(file.Name(), []byte(contents), 0555); err != nil { - t.Fatalf("Unable to write test file %#v", err) - } - return file -} - -func TestReadPodsFromFile(t *testing.T) { - nodeName := "random-test-hostname" - grace := int64(30) - var testCases = []struct { - desc string - pod runtime.Object - expected kubetypes.PodUpdate - }{ - { - desc: "Simple pod", - pod: &api.Pod{ - TypeMeta: unversioned.TypeMeta{ - Kind: "Pod", - APIVersion: "", - }, - ObjectMeta: api.ObjectMeta{ - Name: "test", - UID: "12345", - Namespace: "mynamespace", - }, - Spec: api.PodSpec{ - Containers: []api.Container{{Name: "image", Image: "test/image", SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults()}}, - SecurityContext: &api.PodSecurityContext{}, - }, - Status: api.PodStatus{ - Phase: api.PodPending, - }, - }, - expected: CreatePodUpdate(kubetypes.SET, kubetypes.FileSource, &api.Pod{ - ObjectMeta: api.ObjectMeta{ - Name: "test-" + nodeName, - UID: "12345", - Namespace: "mynamespace", - Annotations: map[string]string{kubetypes.ConfigHashAnnotationKey: "12345"}, - SelfLink: getSelfLink("test-"+nodeName, "mynamespace"), - }, - Spec: api.PodSpec{ - NodeName: nodeName, - RestartPolicy: api.RestartPolicyAlways, - DNSPolicy: api.DNSClusterFirst, - TerminationGracePeriodSeconds: &grace, - Containers: []api.Container{{ - Name: "image", - Image: "test/image", - TerminationMessagePath: "/dev/termination-log", - ImagePullPolicy: "Always", - SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults()}}, - SecurityContext: &api.PodSecurityContext{}, - }, - Status: api.PodStatus{ - Phase: api.PodPending, - }, - }), - }, - } - - for _, testCase := range testCases { - func() { - var versionedPod runtime.Object - err := testapi.Default.Converter().Convert(&testCase.pod, &versionedPod, nil) - if err != nil { - t.Fatalf("%s: error in versioning the pod: %v", testCase.desc, err) - } - fileContents, err := runtime.Encode(testapi.Default.Codec(), versionedPod) - if err != nil { - t.Fatalf("%s: error in encoding the pod: %v", testCase.desc, err) - } - - file := writeTestFile(t, os.TempDir(), "test_pod_config", string(fileContents)) - defer os.Remove(file.Name()) - - ch := make(chan interface{}) - NewSourceFile(file.Name(), types.NodeName(nodeName), time.Millisecond, ch) - select { - case got := <-ch: - update := got.(kubetypes.PodUpdate) - for _, pod := range update.Pods { - if errs := validation.ValidatePod(pod); len(errs) > 0 { - t.Errorf("%s: Invalid pod %#v, %#v", testCase.desc, pod, errs) - } - } - if !api.Semantic.DeepEqual(testCase.expected, update) { - t.Errorf("%s: Expected %#v, Got %#v", testCase.desc, testCase.expected, update) - } - case <-time.After(wait.ForeverTestTimeout): - t.Errorf("%s: Expected update, timeout instead", testCase.desc) - } - }() - } -} - -func TestExtractFromBadDataFile(t *testing.T) { - file := writeTestFile(t, os.TempDir(), "test_pod_config", string([]byte{1, 2, 3})) - defer os.Remove(file.Name()) - - ch := make(chan interface{}, 1) - c := sourceFile{file.Name(), "localhost", ch} - err := c.extractFromPath() - if err == nil { - t.Fatalf("Expected error") - } - expectEmptyChannel(t, ch) -} - -func TestExtractFromEmptyDir(t *testing.T) { - dirName, err := utiltesting.MkTmpdir("file-test") - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - defer os.RemoveAll(dirName) - - ch := make(chan interface{}, 1) - c := sourceFile{dirName, "localhost", ch} - err = c.extractFromPath() - if err != nil { - t.Fatalf("Unexpected error: %v", err) - } - - update := (<-ch).(kubetypes.PodUpdate) - expected := CreatePodUpdate(kubetypes.SET, kubetypes.FileSource) - if !api.Semantic.DeepEqual(expected, update) { - t.Errorf("Expected %#v, Got %#v", expected, update) - } -} diff --git a/pkg/kubelet/config/file_unsupported.go b/pkg/kubelet/config/file_unsupported.go new file mode 100644 index 00000000000..c30ede44331 --- /dev/null +++ b/pkg/kubelet/config/file_unsupported.go @@ -0,0 +1,26 @@ +// +build !linux + +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +// Reads the pod configuration from file or a directory of files. +package config + +import "errors" + +func (s *sourceFile) watch() error { + return errors.New("source file is unsupported in this build") +}