Merge pull request #27832 from wu8685/k8s-inotify

Automatic merge from submit-queue

kubelet detects pod manifest files in the directory using inotify #27137
This commit is contained in:
Kubernetes Submit Queue 2016-09-28 20:45:50 -07:00 committed by GitHub
commit baa4bf763c
5 changed files with 639 additions and 218 deletions

View File

@ -1,5 +1,5 @@
/*
Copyright 2014 The Kubernetes Authors.
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
@ -25,33 +25,50 @@ import (
"sort"
"time"
"k8s.io/kubernetes/pkg/api"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/util/wait"
"github.com/golang/glog"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/client/cache"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/types"
"k8s.io/kubernetes/pkg/util/wait"
)
type sourceFile struct {
path string
nodeName types.NodeName
updates chan<- interface{}
path string
nodeName types.NodeName
store cache.Store
fileKeyMapping map[string]string
updates chan<- interface{}
}
func NewSourceFile(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) {
config := &sourceFile{
path: path,
nodeName: nodeName,
updates: updates,
}
config := new(path, nodeName, period, updates)
glog.V(1).Infof("Watching path %q", path)
go wait.Until(config.run, period, wait.NeverStop)
go wait.Forever(config.run, period)
}
func new(path string, nodeName types.NodeName, period time.Duration, updates chan<- interface{}) *sourceFile {
send := func(objs []interface{}) {
var pods []*api.Pod
for _, o := range objs {
pods = append(pods, o.(*api.Pod))
}
updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
}
store := cache.NewUndeltaStore(send, cache.MetaNamespaceKeyFunc)
return &sourceFile{
path: path,
nodeName: nodeName,
store: store,
fileKeyMapping: map[string]string{},
updates: updates,
}
}
func (s *sourceFile) run() {
if err := s.extractFromPath(); err != nil {
glog.Errorf("Unable to read config path %q: %v", s.path, err)
if err := s.watch(); err != nil {
glog.Errorf("unable to read config path %q: %v", s.path, err)
}
}
@ -59,7 +76,7 @@ func (s *sourceFile) applyDefaults(pod *api.Pod, source string) error {
return applyDefaults(pod, source, true, s.nodeName)
}
func (s *sourceFile) extractFromPath() error {
func (s *sourceFile) resetStoreFromPath() error {
path := s.path
statInfo, err := os.Stat(path)
if err != nil {
@ -77,20 +94,23 @@ func (s *sourceFile) extractFromPath() error {
if err != nil {
return err
}
s.updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
if len(pods) == 0 {
// Emit an update with an empty PodList to allow FileSource to be marked as seen
s.updates <- kubetypes.PodUpdate{Pods: pods, Op: kubetypes.SET, Source: kubetypes.FileSource}
return nil
}
return s.replaceStore(pods...)
case statInfo.Mode().IsRegular():
pod, err := s.extractFromFile(path)
if err != nil {
return err
}
s.updates <- kubetypes.PodUpdate{Pods: []*api.Pod{pod}, Op: kubetypes.SET, Source: kubetypes.FileSource}
return s.replaceStore(pod)
default:
return fmt.Errorf("path is not a directory or file")
}
return nil
}
// Get as many pod configs as we can from a directory. Return an error if and only if something
@ -134,6 +154,17 @@ func (s *sourceFile) extractFromDir(name string) ([]*api.Pod, error) {
func (s *sourceFile) extractFromFile(filename string) (pod *api.Pod, err error) {
glog.V(3).Infof("Reading config file %q", filename)
defer func() {
if err == nil && pod != nil {
objKey, keyErr := cache.MetaNamespaceKeyFunc(pod)
if keyErr != nil {
err = keyErr
return
}
s.fileKeyMapping[filename] = objKey
}
}()
file, err := os.Open(filename)
if err != nil {
return pod, err
@ -160,3 +191,11 @@ func (s *sourceFile) extractFromFile(filename string) (pod *api.Pod, err error)
return pod, fmt.Errorf("%v: read '%v', but couldn't parse as pod(%v).\n",
filename, string(data), podErr)
}
func (s *sourceFile) replaceStore(pods ...*api.Pod) (err error) {
objs := []interface{}{}
for _, pod := range pods {
objs = append(objs, pod)
}
return s.store.Replace(objs, "")
}

View File

@ -0,0 +1,124 @@
// +build linux
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Reads the pod configuration from file or a directory of files.
package config
import (
"fmt"
"os"
"github.com/golang/glog"
"golang.org/x/exp/inotify"
"k8s.io/kubernetes/pkg/api"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
)
type podEventType int
const (
podAdd podEventType = iota
podModify
podDelete
)
func (s *sourceFile) watch() error {
_, err := os.Stat(s.path)
if err != nil {
if !os.IsNotExist(err) {
return err
}
// Emit an update with an empty PodList to allow FileSource to be marked as seen
s.updates <- kubetypes.PodUpdate{Pods: []*api.Pod{}, Op: kubetypes.SET, Source: kubetypes.FileSource}
return fmt.Errorf("path does not exist, ignoring")
}
w, err := inotify.NewWatcher()
if err != nil {
return fmt.Errorf("unable to create inotify: %v", err)
}
defer w.Close()
err = w.AddWatch(s.path, inotify.IN_DELETE_SELF|inotify.IN_CREATE|inotify.IN_MOVED_TO|inotify.IN_MODIFY|inotify.IN_MOVED_FROM|inotify.IN_DELETE)
if err != nil {
return fmt.Errorf("unable to create inotify for path %q: %v", s.path, err)
}
// Reset store with config files already existing when starting
if err := s.resetStoreFromPath(); err != nil {
return fmt.Errorf("unable to read config path %q: %v", s.path, err)
}
for {
select {
case event := <-w.Event:
err = s.processEvent(event)
if err != nil {
return fmt.Errorf("error while processing event (%+v): %v", event, err)
}
case err = <-w.Error:
return fmt.Errorf("error while watching %q: %v", s.path, err)
}
}
}
func (s *sourceFile) processEvent(e *inotify.Event) error {
var eventType podEventType
switch {
case (e.Mask & inotify.IN_ISDIR) > 0:
glog.V(1).Infof("Not recursing into config path %q", s.path)
return nil
case (e.Mask & inotify.IN_CREATE) > 0:
eventType = podAdd
case (e.Mask & inotify.IN_MOVED_TO) > 0:
eventType = podAdd
case (e.Mask & inotify.IN_MODIFY) > 0:
eventType = podModify
case (e.Mask & inotify.IN_DELETE) > 0:
eventType = podDelete
case (e.Mask & inotify.IN_MOVED_FROM) > 0:
eventType = podDelete
case (e.Mask & inotify.IN_DELETE_SELF) > 0:
return fmt.Errorf("the watched path is deleted")
default:
// Ignore rest events
return nil
}
switch eventType {
case podAdd, podModify:
if pod, err := s.extractFromFile(e.Name); err != nil {
glog.Errorf("can't process config file %q: %v", e.Name, err)
} else {
return s.store.Add(pod)
}
case podDelete:
if objKey, keyExist := s.fileKeyMapping[e.Name]; keyExist {
pod, podExist, err := s.store.GetByKey(objKey)
if err != nil {
return err
} else if !podExist {
return fmt.Errorf("the pod with key %s doesn't exist in cache", objKey)
} else {
return s.store.Delete(pod)
}
}
}
return nil
}

View File

@ -0,0 +1,429 @@
// +build linux
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"fmt"
"io"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"sync"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/validation"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/pkg/types"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
"k8s.io/kubernetes/pkg/util/wait"
)
func TestExtractFromNonExistentFile(t *testing.T) {
ch := make(chan interface{}, 1)
c := new("/some/fake/file", "localhost", time.Millisecond, ch)
err := c.watch()
if err == nil {
t.Errorf("Expected error")
}
}
func TestUpdateOnNonExistentFile(t *testing.T) {
ch := make(chan interface{})
NewSourceFile("random_non_existent_path", "localhost", time.Millisecond, ch)
select {
case got := <-ch:
update := got.(kubetypes.PodUpdate)
expected := CreatePodUpdate(kubetypes.SET, kubetypes.FileSource)
if !api.Semantic.DeepDerivative(expected, update) {
t.Fatalf("expected %#v, Got %#v", expected, update)
}
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("expected update, timeout instead")
}
}
func TestReadPodsFromFileExistAlready(t *testing.T) {
hostname := types.NodeName("random-test-hostname")
var testCases = getTestCases(hostname)
for _, testCase := range testCases {
func() {
dirName, err := utiltesting.MkTmpdir("file-test")
if err != nil {
t.Fatalf("unable to create temp dir: %v", err)
}
defer os.RemoveAll(dirName)
file := testCase.writeToFile(dirName, "test_pod_config", t)
ch := make(chan interface{})
NewSourceFile(file, hostname, time.Millisecond, ch)
select {
case got := <-ch:
update := got.(kubetypes.PodUpdate)
for _, pod := range update.Pods {
if errs := validation.ValidatePod(pod); len(errs) > 0 {
t.Fatalf("%s: Invalid pod %#v, %#v", testCase.desc, pod, errs)
}
}
if !api.Semantic.DeepEqual(testCase.expected, update) {
t.Fatalf("%s: Expected %#v, Got %#v", testCase.desc, testCase.expected, update)
}
case <-time.After(wait.ForeverTestTimeout):
t.Fatalf("%s: Expected update, timeout instead", testCase.desc)
}
}()
}
}
func TestReadPodsFromFileExistLater(t *testing.T) {
watchFileAdded(false, t)
}
func TestReadPodsFromFileChanged(t *testing.T) {
watchFileChanged(false, t)
}
func TestReadPodsFromFileInDirAdded(t *testing.T) {
watchFileAdded(true, t)
}
func TestReadPodsFromFileInDirChanged(t *testing.T) {
watchFileChanged(true, t)
}
func TestExtractFromBadDataFile(t *testing.T) {
dirName, err := utiltesting.MkTmpdir("file-test")
if err != nil {
t.Fatalf("unable to create temp dir: %v", err)
}
defer os.RemoveAll(dirName)
fileName := filepath.Join(dirName, "test_pod_config")
err = ioutil.WriteFile(fileName, []byte{1, 2, 3}, 0555)
if err != nil {
t.Fatalf("unable to write test file %#v", err)
}
ch := make(chan interface{}, 1)
c := new(fileName, "localhost", time.Millisecond, ch)
err = c.resetStoreFromPath()
if err == nil {
t.Fatalf("expected error, got nil")
}
expectEmptyChannel(t, ch)
}
func TestExtractFromEmptyDir(t *testing.T) {
dirName, err := utiltesting.MkTmpdir("file-test")
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
defer os.RemoveAll(dirName)
ch := make(chan interface{}, 1)
c := new(dirName, "localhost", time.Millisecond, ch)
err = c.resetStoreFromPath()
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
update := (<-ch).(kubetypes.PodUpdate)
expected := CreatePodUpdate(kubetypes.SET, kubetypes.FileSource)
if !api.Semantic.DeepEqual(expected, update) {
t.Fatalf("expected %#v, Got %#v", expected, update)
}
}
type testCase struct {
desc string
pod runtime.Object
expected kubetypes.PodUpdate
}
func getTestCases(hostname types.NodeName) []*testCase {
grace := int64(30)
return []*testCase{
{
desc: "Simple pod",
pod: &api.Pod{
TypeMeta: unversioned.TypeMeta{
Kind: "Pod",
APIVersion: "",
},
ObjectMeta: api.ObjectMeta{
Name: "test",
UID: "12345",
Namespace: "mynamespace",
},
Spec: api.PodSpec{
Containers: []api.Container{{Name: "image", Image: "test/image", SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults()}},
SecurityContext: &api.PodSecurityContext{},
},
Status: api.PodStatus{
Phase: api.PodPending,
},
},
expected: CreatePodUpdate(kubetypes.SET, kubetypes.FileSource, &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "test-" + string(hostname),
UID: "12345",
Namespace: "mynamespace",
Annotations: map[string]string{kubetypes.ConfigHashAnnotationKey: "12345"},
SelfLink: getSelfLink("test-"+string(hostname), "mynamespace"),
},
Spec: api.PodSpec{
NodeName: string(hostname),
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
TerminationGracePeriodSeconds: &grace,
Containers: []api.Container{{
Name: "image",
Image: "test/image",
TerminationMessagePath: "/dev/termination-log",
ImagePullPolicy: "Always",
SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults()}},
SecurityContext: &api.PodSecurityContext{},
},
Status: api.PodStatus{
Phase: api.PodPending,
},
}),
},
}
}
func (tc *testCase) writeToFile(dir, name string, t *testing.T) string {
var versionedPod runtime.Object
err := testapi.Default.Converter().Convert(&tc.pod, &versionedPod, nil)
if err != nil {
t.Fatalf("%s: error in versioning the pod: %v", tc.desc, err)
}
fileContents, err := runtime.Encode(testapi.Default.Codec(), versionedPod)
if err != nil {
t.Fatalf("%s: error in encoding the pod: %v", tc.desc, err)
}
fileName := filepath.Join(dir, name)
if err := writeFile(fileName, []byte(fileContents)); err != nil {
t.Fatalf("unable to write test file %#v", err)
}
return fileName
}
func watchFileAdded(watchDir bool, t *testing.T) {
hostname := types.NodeName("random-test-hostname")
var testCases = getTestCases(hostname)
fileNamePre := "test_pod_config"
for index, testCase := range testCases {
func() {
dirName, err := utiltesting.MkTmpdir("dir-test")
if err != nil {
t.Fatalf("unable to create temp dir: %v", err)
}
defer os.RemoveAll(dirName)
fileName := fmt.Sprintf("%s_%d", fileNamePre, index)
ch := make(chan interface{})
if watchDir {
NewSourceFile(dirName, hostname, 100*time.Millisecond, ch)
} else {
NewSourceFile(filepath.Join(dirName, fileName), hostname, 100*time.Millisecond, ch)
}
expectEmptyUpdate(t, ch)
addFile := func() {
// Add a file
testCase.writeToFile(dirName, fileName, t)
}
if watchDir {
defer func() {
// Remove the file
deleteFile(dirName, fileName, ch, t)
}()
}
go addFile()
if watchDir {
// expect an update by CREATE inotify event
expectUpdate(t, ch, testCase)
// expect an update by MODIFY inotify event
expectUpdate(t, ch, testCase)
from := fileName
fileName = fileName + "_ch"
go changeFileName(dirName, from, fileName, t)
// expect an update by MOVED_FROM inotify event cause changing file name
expectEmptyUpdate(t, ch)
// expect an update by MOVED_TO inotify event cause changing file name
expectUpdate(t, ch, testCase)
} else {
// expect an update by SourceFile.resetStoreFromPath()
expectUpdate(t, ch, testCase)
}
}()
}
}
func watchFileChanged(watchDir bool, t *testing.T) {
hostname := types.NodeName("random-test-hostname")
var testCases = getTestCases(hostname)
fileNamePre := "test_pod_config"
for index, testCase := range testCases {
func() {
dirName, err := utiltesting.MkTmpdir("dir-test")
fileName := fmt.Sprintf("%s_%d", fileNamePre, index)
if err != nil {
t.Fatalf("unable to create temp dir: %v", err)
}
defer os.RemoveAll(dirName)
var file string
lock := &sync.Mutex{}
ch := make(chan interface{})
func() {
lock.Lock()
defer lock.Unlock()
file = testCase.writeToFile(dirName, fileName, t)
}()
if watchDir {
NewSourceFile(dirName, hostname, 100*time.Millisecond, ch)
defer func() {
// Remove the file
deleteFile(dirName, fileName, ch, t)
}()
} else {
NewSourceFile(file, hostname, 100*time.Millisecond, ch)
}
// expect an update by SourceFile.resetStoreFromPath()
expectUpdate(t, ch, testCase)
changeFile := func() {
// Edit the file content
lock.Lock()
defer lock.Unlock()
pod := testCase.pod.(*api.Pod)
pod.Spec.Containers[0].Name = "image2"
testCase.expected.Pods[0].Spec.Containers[0].Name = "image2"
testCase.writeToFile(dirName, fileName, t)
}
go changeFile()
// expect an update by MODIFY inotify event
expectUpdate(t, ch, testCase)
if watchDir {
from := fileName
fileName = fileName + "_ch"
go changeFileName(dirName, from, fileName, t)
// expect an update by MOVED_FROM inotify event cause changing file name
expectEmptyUpdate(t, ch)
// expect an update by MOVED_TO inotify event cause changing file name
expectUpdate(t, ch, testCase)
}
}()
}
}
func deleteFile(dir, file string, ch chan interface{}, t *testing.T) {
go func() {
path := filepath.Join(dir, file)
err := os.Remove(path)
if err != nil {
t.Errorf("unable to remove test file %s: %s", path, err)
}
}()
expectEmptyUpdate(t, ch)
}
func expectUpdate(t *testing.T, ch chan interface{}, testCase *testCase) {
timer := time.After(5 * time.Second)
for {
select {
case got := <-ch:
update := got.(kubetypes.PodUpdate)
for _, pod := range update.Pods {
if errs := validation.ValidatePod(pod); len(errs) > 0 {
t.Fatalf("%s: Invalid pod %#v, %#v", testCase.desc, pod, errs)
}
}
if !api.Semantic.DeepEqual(testCase.expected, update) {
t.Fatalf("%s: Expected: %#v, Got: %#v", testCase.desc, testCase.expected, update)
}
return
case <-timer:
t.Fatalf("%s: Expected update, timeout instead", testCase.desc)
}
}
}
func expectEmptyUpdate(t *testing.T, ch chan interface{}) {
timer := time.After(5 * time.Second)
for {
select {
case got := <-ch:
update := got.(kubetypes.PodUpdate)
if len(update.Pods) != 0 {
t.Fatalf("expected empty update, got %#v", update)
}
return
case <-timer:
t.Fatalf("expected empty update, timeout instead")
}
}
}
func writeFile(filename string, data []byte) error {
f, err := os.OpenFile(filename, os.O_WRONLY|os.O_CREATE, 0666)
if err != nil {
return err
}
n, err := f.Write(data)
if err == nil && n < len(data) {
err = io.ErrShortWrite
}
if err1 := f.Close(); err == nil {
err = err1
}
return err
}
func changeFileName(dir, from, to string, t *testing.T) {
fromPath := filepath.Join(dir, from)
toPath := filepath.Join(dir, to)
if err := exec.Command("mv", fromPath, toPath).Run(); err != nil {
t.Errorf("Fail to change file name: %s", err)
}
}

View File

@ -1,197 +0,0 @@
/*
Copyright 2014 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package config
import (
"io/ioutil"
"os"
"testing"
"time"
"k8s.io/kubernetes/pkg/api"
"k8s.io/kubernetes/pkg/api/testapi"
"k8s.io/kubernetes/pkg/api/unversioned"
"k8s.io/kubernetes/pkg/api/validation"
kubetypes "k8s.io/kubernetes/pkg/kubelet/types"
"k8s.io/kubernetes/pkg/runtime"
"k8s.io/kubernetes/pkg/securitycontext"
"k8s.io/kubernetes/pkg/types"
utiltesting "k8s.io/kubernetes/pkg/util/testing"
"k8s.io/kubernetes/pkg/util/wait"
)
func TestExtractFromNonExistentFile(t *testing.T) {
ch := make(chan interface{}, 1)
c := sourceFile{"/some/fake/file", "localhost", ch}
err := c.extractFromPath()
if err == nil {
t.Errorf("Expected error")
}
}
func TestUpdateOnNonExistentFile(t *testing.T) {
ch := make(chan interface{})
NewSourceFile("random_non_existent_path", "localhost", time.Millisecond, ch)
select {
case got := <-ch:
update := got.(kubetypes.PodUpdate)
expected := CreatePodUpdate(kubetypes.SET, kubetypes.FileSource)
if !api.Semantic.DeepDerivative(expected, update) {
t.Fatalf("Expected %#v, Got %#v", expected, update)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("Expected update, timeout instead")
}
}
func writeTestFile(t *testing.T, dir, name string, contents string) *os.File {
file, err := ioutil.TempFile(os.TempDir(), "test_pod_config")
if err != nil {
t.Fatalf("Unable to create test file %#v", err)
}
file.Close()
if err := ioutil.WriteFile(file.Name(), []byte(contents), 0555); err != nil {
t.Fatalf("Unable to write test file %#v", err)
}
return file
}
func TestReadPodsFromFile(t *testing.T) {
nodeName := "random-test-hostname"
grace := int64(30)
var testCases = []struct {
desc string
pod runtime.Object
expected kubetypes.PodUpdate
}{
{
desc: "Simple pod",
pod: &api.Pod{
TypeMeta: unversioned.TypeMeta{
Kind: "Pod",
APIVersion: "",
},
ObjectMeta: api.ObjectMeta{
Name: "test",
UID: "12345",
Namespace: "mynamespace",
},
Spec: api.PodSpec{
Containers: []api.Container{{Name: "image", Image: "test/image", SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults()}},
SecurityContext: &api.PodSecurityContext{},
},
Status: api.PodStatus{
Phase: api.PodPending,
},
},
expected: CreatePodUpdate(kubetypes.SET, kubetypes.FileSource, &api.Pod{
ObjectMeta: api.ObjectMeta{
Name: "test-" + nodeName,
UID: "12345",
Namespace: "mynamespace",
Annotations: map[string]string{kubetypes.ConfigHashAnnotationKey: "12345"},
SelfLink: getSelfLink("test-"+nodeName, "mynamespace"),
},
Spec: api.PodSpec{
NodeName: nodeName,
RestartPolicy: api.RestartPolicyAlways,
DNSPolicy: api.DNSClusterFirst,
TerminationGracePeriodSeconds: &grace,
Containers: []api.Container{{
Name: "image",
Image: "test/image",
TerminationMessagePath: "/dev/termination-log",
ImagePullPolicy: "Always",
SecurityContext: securitycontext.ValidSecurityContextWithContainerDefaults()}},
SecurityContext: &api.PodSecurityContext{},
},
Status: api.PodStatus{
Phase: api.PodPending,
},
}),
},
}
for _, testCase := range testCases {
func() {
var versionedPod runtime.Object
err := testapi.Default.Converter().Convert(&testCase.pod, &versionedPod, nil)
if err != nil {
t.Fatalf("%s: error in versioning the pod: %v", testCase.desc, err)
}
fileContents, err := runtime.Encode(testapi.Default.Codec(), versionedPod)
if err != nil {
t.Fatalf("%s: error in encoding the pod: %v", testCase.desc, err)
}
file := writeTestFile(t, os.TempDir(), "test_pod_config", string(fileContents))
defer os.Remove(file.Name())
ch := make(chan interface{})
NewSourceFile(file.Name(), types.NodeName(nodeName), time.Millisecond, ch)
select {
case got := <-ch:
update := got.(kubetypes.PodUpdate)
for _, pod := range update.Pods {
if errs := validation.ValidatePod(pod); len(errs) > 0 {
t.Errorf("%s: Invalid pod %#v, %#v", testCase.desc, pod, errs)
}
}
if !api.Semantic.DeepEqual(testCase.expected, update) {
t.Errorf("%s: Expected %#v, Got %#v", testCase.desc, testCase.expected, update)
}
case <-time.After(wait.ForeverTestTimeout):
t.Errorf("%s: Expected update, timeout instead", testCase.desc)
}
}()
}
}
func TestExtractFromBadDataFile(t *testing.T) {
file := writeTestFile(t, os.TempDir(), "test_pod_config", string([]byte{1, 2, 3}))
defer os.Remove(file.Name())
ch := make(chan interface{}, 1)
c := sourceFile{file.Name(), "localhost", ch}
err := c.extractFromPath()
if err == nil {
t.Fatalf("Expected error")
}
expectEmptyChannel(t, ch)
}
func TestExtractFromEmptyDir(t *testing.T) {
dirName, err := utiltesting.MkTmpdir("file-test")
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
defer os.RemoveAll(dirName)
ch := make(chan interface{}, 1)
c := sourceFile{dirName, "localhost", ch}
err = c.extractFromPath()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
}
update := (<-ch).(kubetypes.PodUpdate)
expected := CreatePodUpdate(kubetypes.SET, kubetypes.FileSource)
if !api.Semantic.DeepEqual(expected, update) {
t.Errorf("Expected %#v, Got %#v", expected, update)
}
}

View File

@ -0,0 +1,26 @@
// +build !linux
/*
Copyright 2016 The Kubernetes Authors.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
// Reads the pod configuration from file or a directory of files.
package config
import "errors"
func (s *sourceFile) watch() error {
return errors.New("source file is unsupported in this build")
}