Merge pull request #44495 from wu8685/fix-inotify-issue

Automatic merge from submit-queue (batch tested with PRs 62231, 44495, 62199). If you want to cherry-pick this change to another branch, please follow the instructions <a href="https://github.com/kubernetes/community/blob/master/contributors/devel/cherry-picks.md">here</a>.

fix #40123: add a periodical polling to update pod config

Fixes #40123
This commit is contained in:
Kubernetes Submit Queue 2018-04-06 20:32:03 -07:00 committed by GitHub
commit b2494fbda9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 278 additions and 105 deletions

View File

@ -80,6 +80,7 @@ go_library(
] + select({
"@io_bazel_rules_go//go/platform:linux": [
"//vendor/golang.org/x/exp/inotify:go_default_library",
"//vendor/k8s.io/client-go/util/flowcontrol:go_default_library",
],
"//conditions:default": [],
}),
@ -91,6 +92,7 @@ go_test(
"apiserver_test.go",
"common_test.go",
"config_test.go",
"file_test.go",
"http_test.go",
] + select({
"@io_bazel_rules_go//go/platform:linux": [

View File

@ -30,30 +30,46 @@ import (
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/tools/cache"
api "k8s.io/kubernetes/pkg/apis/core"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
type podEventType int
const (
podAdd podEventType = iota
podModify
podDelete
eventBufferLen = 10
)
type watchEvent struct {
fileName string
eventType podEventType
}
type sourceFile struct {
path string
nodeName types.NodeName
period time.Duration
store cache.Store
fileKeyMapping map[string]string
updates chan<- interface{}
watchEvents chan *watchEvent
}
func NewSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
// "golang.org/x/exp/inotify" requires a path without trailing "/"
path = strings.TrimRight(path, string(os.PathSeparator))
config := newSourceFile(path, nodeName, updates)
config := newSourceFile(path, nodeName, period, updates)
glog.V(1).Infof("Watching path %q", path)
go wait.Forever(config.run, period)
config.run()
}
func newSourceFile(path string, nodeName types.NodeName, updates chan<- interface{}) *sourceFile {
func newSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) *sourceFile {
send := func(objs []interface{}) {
var pods []*v1.Pod
for _, o := range objs {
@ -65,23 +81,40 @@ func newSourceFile(path string, nodeName types.NodeName, updates chan<- interfac
return &sourceFile{
path: path,
nodeName: nodeName,
period: period,
store: store,
fileKeyMapping: map[string]string{},
updates: updates,
watchEvents: make(chan *watchEvent, eventBufferLen),
}
}
func (s *sourceFile) run() {
if err := s.watch(); err != nil {
glog.Errorf("Unable to read manifest path %q: %v", s.path, err)
}
listTicker := time.NewTicker(s.period)
go func() {
for {
select {
case <-listTicker.C:
if err := s.listConfig(); err != nil {
glog.Errorf("Unable to read config path %q: %v", s.path, err)
}
case e := <-s.watchEvents:
if err := s.consumeWatchEvent(e); err != nil {
glog.Errorf("Unable to process watch event: %v", err)
}
}
}
}()
s.startWatch()
}
func (s *sourceFile) applyDefaults(pod *api.Pod, source string) error {
return applyDefaults(pod, source, true, s.nodeName)
}
func (s *sourceFile) resetStoreFromPath() error {
func (s *sourceFile) listConfig() error {
path := s.path
statInfo, err := os.Stat(path)
if err != nil {
@ -158,7 +191,7 @@ func (s *sourceFile) extractFromDir(name string) ([]*v1.Pod, error) {
}
func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) {
glog.V(3).Infof("Reading manifest file %q", filename)
glog.V(3).Infof("Reading config file %q", filename)
defer func() {
if err == nil && pod != nil {
objKey, keyErr := cache.MetaNamespaceKeyFunc(pod)
@ -193,7 +226,7 @@ func (s *sourceFile) extractFromFile(filename string) (pod *v1.Pod, err error) {
return pod, nil
}
return pod, fmt.Errorf("%v: couldn't parse as pod(%v), please check manifest file.\n", filename, podErr)
return pod, fmt.Errorf("%v: couldn't parse as pod(%v), please check config file.\n", filename, podErr)
}
func (s *sourceFile) replaceStore(pods ...*v1.Pod) (err error) {

View File

@ -24,23 +24,49 @@ import (
"os"
"path/filepath"
"strings"
"time"
"github.com/golang/glog"
"golang.org/x/exp/inotify"
"k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/flowcontrol"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
type podEventType int
const (
podAdd podEventType = iota
podModify
podDelete
retryPeriod = 1 * time.Second
maxRetryPeriod = 20 * time.Second
)
func (s *sourceFile) watch() error {
type retryableError struct {
message string
}
func (e *retryableError) Error() string {
return e.message
}
func (s *sourceFile) startWatch() {
backOff := flowcontrol.NewBackOff(retryPeriod, maxRetryPeriod)
backOffId := "watch"
go wait.Forever(func() {
if backOff.IsInBackOffSinceUpdate(backOffId, time.Now()) {
return
}
if err := s.doWatch(); err != nil {
glog.Errorf("Unable to read config path %q: %v", s.path, err)
if _, retryable := err.(*retryableError); !retryable {
backOff.Next(backOffId, time.Now())
}
}
}, retryPeriod)
}
func (s *sourceFile) doWatch() error {
_, err := os.Stat(s.path)
if err != nil {
if !os.IsNotExist(err) {
@ -48,7 +74,7 @@ func (s *sourceFile) watch() error {
}
// Emit an update with an empty PodList to allow FileSource to be marked as seen
s.updates <- kubetypes.PodUpdate{Pods: []*v1.Pod{}, Op: kubetypes.SET, Source: kubetypes.FileSource}
return fmt.Errorf("path does not exist, ignoring")
return &retryableError{"path does not exist, ignoring"}
}
w, err := inotify.NewWatcher()
@ -57,22 +83,16 @@ func (s *sourceFile) watch() error {
}
defer w.Close()
err = w.AddWatch(s.path, inotify.IN_DELETE_SELF|inotify.IN_CREATE|inotify.IN_MOVED_TO|inotify.IN_MODIFY|inotify.IN_MOVED_FROM|inotify.IN_DELETE)
err = w.AddWatch(s.path, inotify.IN_DELETE_SELF|inotify.IN_CREATE|inotify.IN_MOVED_TO|inotify.IN_MODIFY|inotify.IN_MOVED_FROM|inotify.IN_DELETE|inotify.IN_ATTRIB)
if err != nil {
return fmt.Errorf("unable to create inotify for path %q: %v", s.path, err)
}
// Reset store with manifest files already existing when starting
if err := s.resetStoreFromPath(); err != nil {
return fmt.Errorf("unable to read manifest path %q: %v", s.path, err)
}
for {
select {
case event := <-w.Event:
err = s.processEvent(event)
if err != nil {
return fmt.Errorf("error while processing event (%+v): %v", event, err)
if err = s.produceWatchEvent(event); err != nil {
return fmt.Errorf("error while processing inotify event (%+v): %v", event, err)
}
case err = <-w.Error:
return fmt.Errorf("error while watching %q: %v", s.path, err)
@ -80,7 +100,7 @@ func (s *sourceFile) watch() error {
}
}
func (s *sourceFile) processEvent(e *inotify.Event) error {
func (s *sourceFile) produceWatchEvent(e *inotify.Event) error {
// Ignore file start with dots
if strings.HasPrefix(filepath.Base(e.Name), ".") {
glog.V(4).Infof("Ignored pod manifest: %s, because it starts with dots", e.Name)
@ -97,6 +117,8 @@ func (s *sourceFile) processEvent(e *inotify.Event) error {
eventType = podAdd
case (e.Mask & inotify.IN_MODIFY) > 0:
eventType = podModify
case (e.Mask & inotify.IN_ATTRIB) > 0:
eventType = podModify
case (e.Mask & inotify.IN_DELETE) > 0:
eventType = podDelete
case (e.Mask & inotify.IN_MOVED_FROM) > 0:
@ -108,22 +130,31 @@ func (s *sourceFile) processEvent(e *inotify.Event) error {
return nil
}
switch eventType {
s.watchEvents <- &watchEvent{e.Name, eventType}
return nil
}
func (s *sourceFile) consumeWatchEvent(e *watchEvent) error {
switch e.eventType {
case podAdd, podModify:
if pod, err := s.extractFromFile(e.Name); err != nil {
glog.Errorf("Can't process manifest file %q: %v", e.Name, err)
if pod, err := s.extractFromFile(e.fileName); err != nil {
return fmt.Errorf("can't process config file %q: %v", e.fileName, err)
} else {
return s.store.Add(pod)
}
case podDelete:
if objKey, keyExist := s.fileKeyMapping[e.Name]; keyExist {
if objKey, keyExist := s.fileKeyMapping[e.fileName]; keyExist {
pod, podExist, err := s.store.GetByKey(objKey)
if err != nil {
return err
} else if !podExist {
return fmt.Errorf("the pod with key %s doesn't exist in cache", objKey)
} else {
return s.store.Delete(pod)
if err = s.store.Delete(pod); err != nil {
return fmt.Errorf("failed to remove deleted pod from cache: %v", err)
} else {
delete(s.fileKeyMapping, e.fileName)
}
}
}
}

View File

@ -21,7 +21,6 @@ package config
import (
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
@ -35,7 +34,6 @@ import (
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/wait"
utiltesting "k8s.io/client-go/util/testing"
"k8s.io/kubernetes/pkg/api/testapi"
api "k8s.io/kubernetes/pkg/apis/core"
k8s_api_v1 "k8s.io/kubernetes/pkg/apis/core/v1"
@ -46,8 +44,8 @@ import (
func TestExtractFromNonExistentFile(t *testing.T) {
ch := make(chan interface{}, 1)
c := newSourceFile("/some/fake/file", "localhost", ch)
err := c.watch()
lw := newSourceFile("/some/fake/file", "localhost", time.Millisecond, ch)
err := lw.doWatch()
if err == nil {
t.Errorf("Expected error")
}
@ -75,7 +73,7 @@ func TestReadPodsFromFileExistAlready(t *testing.T) {
for _, testCase := range testCases {
func() {
dirName, err := utiltesting.MkTmpdir("file-test")
dirName, err := mkTempDir("file-test")
if err != nil {
t.Fatalf("unable to create temp dir: %v", err)
}
@ -107,69 +105,35 @@ func TestReadPodsFromFileExistAlready(t *testing.T) {
}
}
func TestReadPodsFromFileExistLater(t *testing.T) {
watchFileAdded(false, t)
var (
testCases = []struct {
watchDir bool
symlink bool
}{
{true, true},
{true, false},
{false, true},
{false, false},
}
)
func TestWatchFileAdded(t *testing.T) {
for _, testCase := range testCases {
watchFileAdded(testCase.watchDir, testCase.symlink, t)
}
}
func TestReadPodsFromFileChanged(t *testing.T) {
watchFileChanged(false, t)
}
func TestReadPodsFromFileInDirAdded(t *testing.T) {
watchFileAdded(true, t)
}
func TestReadPodsFromFileInDirChanged(t *testing.T) {
watchFileChanged(true, t)
}
func TestExtractFromBadDataFile(t *testing.T) {
dirName, err := utiltesting.MkTmpdir("file-test")
if err != nil {
t.Fatalf("unable to create temp dir: %v", err)
}
defer os.RemoveAll(dirName)
fileName := filepath.Join(dirName, "test_pod_manifest")
err = ioutil.WriteFile(fileName, []byte{1, 2, 3}, 0555)
if err != nil {
t.Fatalf("unable to write test file %#v", err)
}
ch := make(chan interface{}, 1)
c := newSourceFile(fileName, "localhost", ch)
err = c.resetStoreFromPath()
if err == nil {
t.Fatalf("expected error, got nil")
}
expectEmptyChannel(t, ch)
}
func TestExtractFromEmptyDir(t *testing.T) {
dirName, err := utiltesting.MkTmpdir("file-test")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
defer os.RemoveAll(dirName)
ch := make(chan interface{}, 1)
c := newSourceFile(dirName, "localhost", ch)
err = c.resetStoreFromPath()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
update := (<-ch).(kubetypes.PodUpdate)
expected := CreatePodUpdate(kubetypes.SET, kubetypes.FileSource)
if !apiequality.Semantic.DeepEqual(expected, update) {
t.Fatalf("expected %#v, Got %#v", expected, update)
func TestWatchFileChanged(t *testing.T) {
for _, testCase := range testCases {
watchFileChanged(testCase.watchDir, testCase.symlink, t)
}
}
type testCase struct {
desc string
pod runtime.Object
expected kubetypes.PodUpdate
desc string
linkedFile string
pod runtime.Object
expected kubetypes.PodUpdate
}
func getTestCases(hostname types.NodeName) []*testCase {
@ -250,19 +214,40 @@ func (tc *testCase) writeToFile(dir, name string, t *testing.T) string {
return fileName
}
func watchFileAdded(watchDir bool, t *testing.T) {
func createSymbolicLink(link, target, name string, t *testing.T) string {
linkName := filepath.Join(link, name)
linkedFile := filepath.Join(target, name)
err := os.Symlink(linkedFile, linkName)
if err != nil {
t.Fatalf("unexpected error when create symbolic link: %v", err)
}
return linkName
}
func watchFileAdded(watchDir bool, symlink bool, t *testing.T) {
hostname := types.NodeName("random-test-hostname")
var testCases = getTestCases(hostname)
fileNamePre := "test_pod_manifest"
for index, testCase := range testCases {
func() {
dirName, err := utiltesting.MkTmpdir("dir-test")
dirName, err := mkTempDir("dir-test")
if err != nil {
t.Fatalf("unable to create temp dir: %v", err)
}
defer os.RemoveAll(dirName)
defer removeAll(dirName, t)
fileName := fmt.Sprintf("%s_%d", fileNamePre, index)
var linkedDirName string
if symlink {
linkedDirName, err = mkTempDir("linked-dir-test")
if err != nil {
t.Fatalf("unable to create temp dir for linked files: %v", err)
}
defer removeAll(linkedDirName, t)
createSymbolicLink(dirName, linkedDirName, fileName, t)
}
ch := make(chan interface{})
if watchDir {
@ -274,12 +259,17 @@ func watchFileAdded(watchDir bool, t *testing.T) {
addFile := func() {
// Add a file
if symlink {
testCase.writeToFile(linkedDirName, fileName, t)
return
}
testCase.writeToFile(dirName, fileName, t)
}
go addFile()
// For !watchDir: expect an update by SourceFile.resetStoreFromPath().
// For !watchDir: expect an update by SourceFile.reloadConfig().
// For watchDir: expect at least one update from CREATE & MODIFY inotify event.
// Shouldn't expect two updates from CREATE & MODIFY because CREATE doesn't guarantee file written.
// In that case no update will be sent from CREATE event.
@ -288,19 +278,29 @@ func watchFileAdded(watchDir bool, t *testing.T) {
}
}
func watchFileChanged(watchDir bool, t *testing.T) {
func watchFileChanged(watchDir bool, symlink bool, t *testing.T) {
hostname := types.NodeName("random-test-hostname")
var testCases = getTestCases(hostname)
fileNamePre := "test_pod_manifest"
for index, testCase := range testCases {
func() {
dirName, err := utiltesting.MkTmpdir("dir-test")
dirName, err := mkTempDir("dir-test")
fileName := fmt.Sprintf("%s_%d", fileNamePre, index)
if err != nil {
t.Fatalf("unable to create temp dir: %v", err)
}
defer os.RemoveAll(dirName)
defer removeAll(dirName, t)
var linkedDirName string
if symlink {
linkedDirName, err = mkTempDir("linked-dir-test")
if err != nil {
t.Fatalf("unable to create temp dir for linked files: %v", err)
}
defer removeAll(linkedDirName, t)
createSymbolicLink(dirName, linkedDirName, fileName, t)
}
var file string
lock := &sync.Mutex{}
@ -308,6 +308,12 @@ func watchFileChanged(watchDir bool, t *testing.T) {
func() {
lock.Lock()
defer lock.Unlock()
if symlink {
file = testCase.writeToFile(linkedDirName, fileName, t)
return
}
file = testCase.writeToFile(dirName, fileName, t)
}()
@ -332,7 +338,12 @@ func watchFileChanged(watchDir bool, t *testing.T) {
pod.Spec.Containers[0].Name = "image2"
testCase.expected.Pods[0].Spec.Containers[0].Name = "image2"
testCase.writeToFile(dirName, fileName, t)
if symlink {
file = testCase.writeToFile(linkedDirName, fileName, t)
return
}
file = testCase.writeToFile(dirName, fileName, t)
}
go changeFile()
@ -370,6 +381,10 @@ func expectUpdate(t *testing.T, ch chan interface{}, testCase *testCase) {
select {
case got := <-ch:
update := got.(kubetypes.PodUpdate)
if len(update.Pods) == 0 {
// filter out the empty updates from reading a non-existing path
continue
}
for _, pod := range update.Pods {
// TODO: remove the conversion when validation is performed on versioned objects.
internalPod := &api.Pod{}

View File

@ -0,0 +1,84 @@
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"io/ioutil"
"os"
"path/filepath"
"testing"
"time"
apiequality "k8s.io/apimachinery/pkg/api/equality"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
func TestExtractFromBadDataFile(t *testing.T) {
dirName, err := mkTempDir("file-test")
if err != nil {
t.Fatalf("unable to create temp dir: %v", err)
}
defer removeAll(dirName, t)
fileName := filepath.Join(dirName, "test_pod_config")
err = ioutil.WriteFile(fileName, []byte{1, 2, 3}, 0555)
if err != nil {
t.Fatalf("unable to write test file %#v", err)
}
ch := make(chan interface{}, 1)
lw := newSourceFile(fileName, "localhost", time.Millisecond, ch)
err = lw.listConfig()
if err == nil {
t.Fatalf("expected error, got nil")
}
expectEmptyChannel(t, ch)
}
func TestExtractFromEmptyDir(t *testing.T) {
dirName, err := mkTempDir("file-test")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
defer removeAll(dirName, t)
ch := make(chan interface{}, 1)
lw := newSourceFile(dirName, "localhost", time.Millisecond, ch)
err = lw.listConfig()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
update, ok := (<-ch).(kubetypes.PodUpdate)
if !ok {
t.Fatalf("unexpected type: %#v", update)
}
expected := CreatePodUpdate(kubetypes.SET, kubetypes.FileSource)
if !apiequality.Semantic.DeepEqual(expected, update) {
t.Fatalf("expected %#v, got %#v", expected, update)
}
}
func mkTempDir(prefix string) (string, error) {
return ioutil.TempDir(os.TempDir(), prefix)
}
func removeAll(dir string, t *testing.T) {
if err := os.RemoveAll(dir); err != nil {
t.Fatalf("unable to remove dir %s: %v", dir, err)
}
}

View File

@ -19,8 +19,16 @@ limitations under the License.
// Reads the pod configuration from file or a directory of files.
package config
import "errors"
import (
"fmt"
func (s *sourceFile) watch() error {
return errors.New("source file is unsupported in this build")
"github.com/golang/glog"
)
func (s *sourceFile) startWatch() {
glog.Errorf("Watching source file is unsupported in this build")
}
func (s *sourceFile) consumeWatchEvent(e *watchEvent) error {
return fmt.Errorf("consuming watch event is unsupported in this build")
}