diff --git a/pkg/kubelet/config/BUILD b/pkg/kubelet/config/BUILD index 3f88dbf7248..7dd54bc3f4b 100644 --- a/pkg/kubelet/config/BUILD +++ b/pkg/kubelet/config/BUILD @@ -80,6 +80,7 @@ go_library( ] + select({ "@io_bazel_rules_go//go/platform:linux": [ "//vendor/golang.org/x/exp/inotify:go_default_library", + "//vendor/k8s.io/client-go/util/flowcontrol:go_default_library", ], "//conditions:default": [], }), @@ -91,6 +92,7 @@ go_test( "apiserver_test.go", "common_test.go", "config_test.go", + "file_test.go", "http_test.go", ] + select({ "@io_bazel_rules_go//go/platform:linux": [ diff --git a/pkg/kubelet/config/file.go b/pkg/kubelet/config/file.go index b96a790ec78..7fbf53349dc 100644 --- a/pkg/kubelet/config/file.go +++ b/pkg/kubelet/config/file.go @@ -30,30 +30,46 @@ import ( "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/types" - "k8s.io/apimachinery/pkg/util/wait" "k8s.io/client-go/tools/cache" api "k8s.io/kubernetes/pkg/apis/core" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) +type podEventType int + +const ( + podAdd podEventType = iota + podModify + podDelete + + eventBufferLen = 10 +) + +type watchEvent struct { + fileName string + eventType podEventType +} + type sourceFile struct { path string nodeName types.NodeName + period time.Duration store cache.Store fileKeyMapping map[string]string updates chan<- interface{} + watchEvents chan *watchEvent } func NewSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) { // "golang.org/x/exp/inotify" requires a path without trailing "/" path = strings.TrimRight(path, string(os.PathSeparator)) - config := newSourceFile(path, nodeName, updates) + config := newSourceFile(path, nodeName, period, updates) glog.V(1).Infof("Watching path %q", path) - go wait.Forever(config.run, period) + config.run() } -func newSourceFile(path string, nodeName types.NodeName, updates chan<- interface{}) *sourceFile { +func newSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) *sourceFile { send := func(objs []interface{}) { var pods []*v1.Pod for _, o := range objs { @@ -65,23 +81,40 @@ func newSourceFile(path string, nodeName types.NodeName, updates chan<- interfac return &sourceFile{ path: path, nodeName: nodeName, + period: period, store: store, fileKeyMapping: map[string]string{}, updates: updates, + watchEvents: make(chan *watchEvent, eventBufferLen), } } func (s *sourceFile) run() { - if err := s.watch(); err != nil { - glog.Errorf("Unable to read manifest path %q: %v", s.path, err) - } + listTicker := time.NewTicker(s.period) + + go func() { + for { + select { + case <-listTicker.C: + if err := s.listConfig(); err != nil { + glog.Errorf("Unable to read config path %q: %v", s.path, err) + } + case e := <-s.watchEvents: + if err := s.consumeWatchEvent(e); err != nil { + glog.Errorf("Unable to process watch event: %v", err) + } + } + } + }() + + s.startWatch() } func (s *sourceFile) applyDefaults(pod *api.Pod, source string) error { return applyDefaults(pod, source, true, s.nodeName) } -func (s *sourceFile) resetStoreFromPath() error { +func (s *sourceFile) listConfig() error { path := s.path statInfo, err := os.Stat(path) if err != nil { @@ -158,7 +191,7 @@ func (s *sourceFile) extractFromDir(name string) ([]*v1.Pod, error) { } func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) { - glog.V(3).Infof("Reading manifest file %q", filename) + glog.V(3).Infof("Reading config file %q", filename) defer func() { if err == nil && pod != nil { objKey, keyErr := cache.MetaNamespaceKeyFunc(pod) @@ -193,7 +226,7 @@ func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) { return pod, nil } - return pod, fmt.Errorf("%v: couldn't parse as pod(%v), please check manifest file.\n", filename, podErr) + return pod, fmt.Errorf("%v: couldn't parse as pod(%v), please check config file.\n", filename, podErr) } func (s *sourceFile) replaceStore(pods ...*v1.Pod) (err error) { diff --git a/pkg/kubelet/config/file_linux.go b/pkg/kubelet/config/file_linux.go index 315c80a6864..4e6b1a5a237 100644 --- a/pkg/kubelet/config/file_linux.go +++ b/pkg/kubelet/config/file_linux.go @@ -24,23 +24,49 @@ import ( "os" "path/filepath" "strings" + "time" "github.com/golang/glog" "golang.org/x/exp/inotify" "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/util/wait" + "k8s.io/client-go/util/flowcontrol" kubetypes "k8s.io/kubernetes/pkg/kubelet/types" ) -type podEventType int - const ( - podAdd podEventType = iota - podModify - podDelete + retryPeriod = 1 * time.Second + maxRetryPeriod = 20 * time.Second ) -func (s *sourceFile) watch() error { +type retryableError struct { + message string +} + +func (e *retryableError) Error() string { + return e.message +} + +func (s *sourceFile) startWatch() { + backOff := flowcontrol.NewBackOff(retryPeriod, maxRetryPeriod) + backOffId := "watch" + + go wait.Forever(func() { + if backOff.IsInBackOffSinceUpdate(backOffId, time.Now()) { + return + } + + if err := s.doWatch(); err != nil { + glog.Errorf("Unable to read config path %q: %v", s.path, err) + if _, retryable := err.(*retryableError); !retryable { + backOff.Next(backOffId, time.Now()) + } + } + }, retryPeriod) +} + +func (s *sourceFile) doWatch() error { _, err := os.Stat(s.path) if err != nil { if !os.IsNotExist(err) { @@ -48,7 +74,7 @@ func (s *sourceFile) watch() error { } // Emit an update with an empty PodList to allow FileSource to be marked as seen s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.FileSource} - return fmt.Errorf("path does not exist, ignoring") + return &retryableError{"path does not exist, ignoring"} } w, err := inotify.NewWatcher() @@ -57,22 +83,16 @@ func (s *sourceFile) watch() error { } defer w.Close() - err = w.AddWatch(s.path, inotify.IN_DELETE_SELF|inotify.IN_CREATE|inotify.IN_MOVED_TO|inotify.IN_MODIFY|inotify.IN_MOVED_FROM|inotify.IN_DELETE) + err = w.AddWatch(s.path, inotify.IN_DELETE_SELF|inotify.IN_CREATE|inotify.IN_MOVED_TO|inotify.IN_MODIFY|inotify.IN_MOVED_FROM|inotify.IN_DELETE|inotify.IN_ATTRIB) if err != nil { return fmt.Errorf("unable to create inotify for path %q: %v", s.path, err) } - // Reset store with manifest files already existing when starting - if err := s.resetStoreFromPath(); err != nil { - return fmt.Errorf("unable to read manifest path %q: %v", s.path, err) - } - for { select { case event := <-w.Event: - err = s.processEvent(event) - if err != nil { - return fmt.Errorf("error while processing event (%+v): %v", event, err) + if err = s.produceWatchEvent(event); err != nil { + return fmt.Errorf("error while processing inotify event (%+v): %v", event, err) } case err = <-w.Error: return fmt.Errorf("error while watching %q: %v", s.path, err) @@ -80,7 +100,7 @@ func (s *sourceFile) watch() error { } } -func (s *sourceFile) processEvent(e *inotify.Event) error { +func (s *sourceFile) produceWatchEvent(e *inotify.Event) error { // Ignore file start with dots if strings.HasPrefix(filepath.Base(e.Name), ".") { glog.V(4).Infof("Ignored pod manifest: %s, because it starts with dots", e.Name) @@ -97,6 +117,8 @@ func (s *sourceFile) processEvent(e *inotify.Event) error { eventType = podAdd case (e.Mask & inotify.IN_MODIFY) > 0: eventType = podModify + case (e.Mask & inotify.IN_ATTRIB) > 0: + eventType = podModify case (e.Mask & inotify.IN_DELETE) > 0: eventType = podDelete case (e.Mask & inotify.IN_MOVED_FROM) > 0: @@ -108,22 +130,31 @@ func (s *sourceFile) processEvent(e *inotify.Event) error { return nil } - switch eventType { + s.watchEvents <- &watchEvent{e.Name, eventType} + return nil +} + +func (s *sourceFile) consumeWatchEvent(e *watchEvent) error { + switch e.eventType { case podAdd, podModify: - if pod, err := s.extractFromFile(e.Name); err != nil { - glog.Errorf("Can't process manifest file %q: %v", e.Name, err) + if pod, err := s.extractFromFile(e.fileName); err != nil { + return fmt.Errorf("can't process config file %q: %v", e.fileName, err) } else { return s.store.Add(pod) } case podDelete: - if objKey, keyExist := s.fileKeyMapping[e.Name]; keyExist { + if objKey, keyExist := s.fileKeyMapping[e.fileName]; keyExist { pod, podExist, err := s.store.GetByKey(objKey) if err != nil { return err } else if !podExist { return fmt.Errorf("the pod with key %s doesn't exist in cache", objKey) } else { - return s.store.Delete(pod) + if err = s.store.Delete(pod); err != nil { + return fmt.Errorf("failed to remove deleted pod from cache: %v", err) + } else { + delete(s.fileKeyMapping, e.fileName) + } } } } diff --git a/pkg/kubelet/config/file_linux_test.go b/pkg/kubelet/config/file_linux_test.go index c1b133977ac..55e238dea56 100644 --- a/pkg/kubelet/config/file_linux_test.go +++ b/pkg/kubelet/config/file_linux_test.go @@ -21,7 +21,6 @@ package config import ( "fmt" "io" - "io/ioutil" "os" "os/exec" "path/filepath" @@ -35,7 +34,6 @@ import ( "k8s.io/apimachinery/pkg/runtime" "k8s.io/apimachinery/pkg/types" "k8s.io/apimachinery/pkg/util/wait" - utiltesting "k8s.io/client-go/util/testing" "k8s.io/kubernetes/pkg/api/testapi" api "k8s.io/kubernetes/pkg/apis/core" k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1" @@ -46,8 +44,8 @@ import ( func TestExtractFromNonExistentFile(t *testing.T) { ch := make(chan interface{}, 1) - c := newSourceFile("/some/fake/file", "localhost", ch) - err := c.watch() + lw := newSourceFile("/some/fake/file", "localhost", time.Millisecond, ch) + err := lw.doWatch() if err == nil { t.Errorf("Expected error") } @@ -75,7 +73,7 @@ func TestReadPodsFromFileExistAlready(t *testing.T) { for _, testCase := range testCases { func() { - dirName, err := utiltesting.MkTmpdir("file-test") + dirName, err := mkTempDir("file-test") if err != nil { t.Fatalf("unable to create temp dir: %v", err) } @@ -107,69 +105,35 @@ func TestReadPodsFromFileExistAlready(t *testing.T) { } } -func TestReadPodsFromFileExistLater(t *testing.T) { - watchFileAdded(false, t) +var ( + testCases = []struct { + watchDir bool + symlink bool + }{ + {true, true}, + {true, false}, + {false, true}, + {false, false}, + } +) + +func TestWatchFileAdded(t *testing.T) { + for _, testCase := range testCases { + watchFileAdded(testCase.watchDir, testCase.symlink, t) + } } -func TestReadPodsFromFileChanged(t *testing.T) { - watchFileChanged(false, t) -} - -func TestReadPodsFromFileInDirAdded(t *testing.T) { - watchFileAdded(true, t) -} - -func TestReadPodsFromFileInDirChanged(t *testing.T) { - watchFileChanged(true, t) -} - -func TestExtractFromBadDataFile(t *testing.T) { - dirName, err := utiltesting.MkTmpdir("file-test") - if err != nil { - t.Fatalf("unable to create temp dir: %v", err) - } - defer os.RemoveAll(dirName) - - fileName := filepath.Join(dirName, "test_pod_manifest") - err = ioutil.WriteFile(fileName, []byte{1, 2, 3}, 0555) - if err != nil { - t.Fatalf("unable to write test file %#v", err) - } - - ch := make(chan interface{}, 1) - c := newSourceFile(fileName, "localhost", ch) - err = c.resetStoreFromPath() - if err == nil { - t.Fatalf("expected error, got nil") - } - expectEmptyChannel(t, ch) -} - -func TestExtractFromEmptyDir(t *testing.T) { - dirName, err := utiltesting.MkTmpdir("file-test") - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - defer os.RemoveAll(dirName) - - ch := make(chan interface{}, 1) - c := newSourceFile(dirName, "localhost", ch) - err = c.resetStoreFromPath() - if err != nil { - t.Fatalf("unexpected error: %v", err) - } - - update := (<-ch).(kubetypes.PodUpdate) - expected := CreatePodUpdate(kubetypes.SET, kubetypes.FileSource) - if !apiequality.Semantic.DeepEqual(expected, update) { - t.Fatalf("expected %#v, Got %#v", expected, update) +func TestWatchFileChanged(t *testing.T) { + for _, testCase := range testCases { + watchFileChanged(testCase.watchDir, testCase.symlink, t) } } type testCase struct { - desc string - pod runtime.Object - expected kubetypes.PodUpdate + desc string + linkedFile string + pod runtime.Object + expected kubetypes.PodUpdate } func getTestCases(hostname types.NodeName) []*testCase { @@ -250,19 +214,40 @@ func (tc *testCase) writeToFile(dir, name string, t *testing.T) string { return fileName } -func watchFileAdded(watchDir bool, t *testing.T) { +func createSymbolicLink(link, target, name string, t *testing.T) string { + linkName := filepath.Join(link, name) + linkedFile := filepath.Join(target, name) + + err := os.Symlink(linkedFile, linkName) + if err != nil { + t.Fatalf("unexpected error when create symbolic link: %v", err) + } + return linkName +} + +func watchFileAdded(watchDir bool, symlink bool, t *testing.T) { hostname := types.NodeName("random-test-hostname") var testCases = getTestCases(hostname) fileNamePre := "test_pod_manifest" for index, testCase := range testCases { func() { - dirName, err := utiltesting.MkTmpdir("dir-test") + dirName, err := mkTempDir("dir-test") if err != nil { t.Fatalf("unable to create temp dir: %v", err) } - defer os.RemoveAll(dirName) + defer removeAll(dirName, t) + fileName := fmt.Sprintf("%s_%d", fileNamePre, index) + var linkedDirName string + if symlink { + linkedDirName, err = mkTempDir("linked-dir-test") + if err != nil { + t.Fatalf("unable to create temp dir for linked files: %v", err) + } + defer removeAll(linkedDirName, t) + createSymbolicLink(dirName, linkedDirName, fileName, t) + } ch := make(chan interface{}) if watchDir { @@ -274,12 +259,17 @@ func watchFileAdded(watchDir bool, t *testing.T) { addFile := func() { // Add a file + if symlink { + testCase.writeToFile(linkedDirName, fileName, t) + return + } + testCase.writeToFile(dirName, fileName, t) } go addFile() - // For !watchDir: expect an update by SourceFile.resetStoreFromPath(). + // For !watchDir: expect an update by SourceFile.reloadConfig(). // For watchDir: expect at least one update from CREATE & MODIFY inotify event. // Shouldn't expect two updates from CREATE & MODIFY because CREATE doesn't guarantee file written. // In that case no update will be sent from CREATE event. @@ -288,19 +278,29 @@ func watchFileAdded(watchDir bool, t *testing.T) { } } -func watchFileChanged(watchDir bool, t *testing.T) { +func watchFileChanged(watchDir bool, symlink bool, t *testing.T) { hostname := types.NodeName("random-test-hostname") var testCases = getTestCases(hostname) fileNamePre := "test_pod_manifest" for index, testCase := range testCases { func() { - dirName, err := utiltesting.MkTmpdir("dir-test") + dirName, err := mkTempDir("dir-test") fileName := fmt.Sprintf("%s_%d", fileNamePre, index) if err != nil { t.Fatalf("unable to create temp dir: %v", err) } - defer os.RemoveAll(dirName) + defer removeAll(dirName, t) + + var linkedDirName string + if symlink { + linkedDirName, err = mkTempDir("linked-dir-test") + if err != nil { + t.Fatalf("unable to create temp dir for linked files: %v", err) + } + defer removeAll(linkedDirName, t) + createSymbolicLink(dirName, linkedDirName, fileName, t) + } var file string lock := &sync.Mutex{} @@ -308,6 +308,12 @@ func watchFileChanged(watchDir bool, t *testing.T) { func() { lock.Lock() defer lock.Unlock() + + if symlink { + file = testCase.writeToFile(linkedDirName, fileName, t) + return + } + file = testCase.writeToFile(dirName, fileName, t) }() @@ -332,7 +338,12 @@ func watchFileChanged(watchDir bool, t *testing.T) { pod.Spec.Containers[0].Name = "image2" testCase.expected.Pods[0].Spec.Containers[0].Name = "image2" - testCase.writeToFile(dirName, fileName, t) + if symlink { + file = testCase.writeToFile(linkedDirName, fileName, t) + return + } + + file = testCase.writeToFile(dirName, fileName, t) } go changeFile() @@ -370,6 +381,10 @@ func expectUpdate(t *testing.T, ch chan interface{}, testCase *testCase) { select { case got := <-ch: update := got.(kubetypes.PodUpdate) + if len(update.Pods) == 0 { + // filter out the empty updates from reading a non-existing path + continue + } for _, pod := range update.Pods { // TODO: remove the conversion when validation is performed on versioned objects. internalPod := &api.Pod{} diff --git a/pkg/kubelet/config/file_test.go b/pkg/kubelet/config/file_test.go new file mode 100644 index 00000000000..36c9f210805 --- /dev/null +++ b/pkg/kubelet/config/file_test.go @@ -0,0 +1,84 @@ +/* +Copyright 2016 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package config + +import ( + "io/ioutil" + "os" + "path/filepath" + "testing" + "time" + + apiequality "k8s.io/apimachinery/pkg/api/equality" + kubetypes "k8s.io/kubernetes/pkg/kubelet/types" +) + +func TestExtractFromBadDataFile(t *testing.T) { + dirName, err := mkTempDir("file-test") + if err != nil { + t.Fatalf("unable to create temp dir: %v", err) + } + defer removeAll(dirName, t) + + fileName := filepath.Join(dirName, "test_pod_config") + err = ioutil.WriteFile(fileName, []byte{1, 2, 3}, 0555) + if err != nil { + t.Fatalf("unable to write test file %#v", err) + } + + ch := make(chan interface{}, 1) + lw := newSourceFile(fileName, "localhost", time.Millisecond, ch) + err = lw.listConfig() + if err == nil { + t.Fatalf("expected error, got nil") + } + expectEmptyChannel(t, ch) +} + +func TestExtractFromEmptyDir(t *testing.T) { + dirName, err := mkTempDir("file-test") + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + defer removeAll(dirName, t) + + ch := make(chan interface{}, 1) + lw := newSourceFile(dirName, "localhost", time.Millisecond, ch) + err = lw.listConfig() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + + update, ok := (<-ch).(kubetypes.PodUpdate) + if !ok { + t.Fatalf("unexpected type: %#v", update) + } + expected := CreatePodUpdate(kubetypes.SET, kubetypes.FileSource) + if !apiequality.Semantic.DeepEqual(expected, update) { + t.Fatalf("expected %#v, got %#v", expected, update) + } +} + +func mkTempDir(prefix string) (string, error) { + return ioutil.TempDir(os.TempDir(), prefix) +} + +func removeAll(dir string, t *testing.T) { + if err := os.RemoveAll(dir); err != nil { + t.Fatalf("unable to remove dir %s: %v", dir, err) + } +} diff --git a/pkg/kubelet/config/file_unsupported.go b/pkg/kubelet/config/file_unsupported.go index c30ede44331..4bee74f544d 100644 --- a/pkg/kubelet/config/file_unsupported.go +++ b/pkg/kubelet/config/file_unsupported.go @@ -19,8 +19,16 @@ limitations under the License. // Reads the pod configuration from file or a directory of files. package config -import "errors" +import ( + "fmt" -func (s *sourceFile) watch() error { - return errors.New("source file is unsupported in this build") + "github.com/golang/glog" +) + +func (s *sourceFile) startWatch() { + glog.Errorf("Watching source file is unsupported in this build") +} + +func (s *sourceFile) consumeWatchEvent(e *watchEvent) error { + return fmt.Errorf("consuming watch event is unsupported in this build") }