Pass hostname to all kubelet config sources instead of os.Hostname()

Make applyDefaults be tied to the current config source.
This commit is contained in:
Clayton Coleman 2015-03-22 23:06:12 -04:00
parent bd12cfea69
commit d020ca00b8
9 changed files with 87 additions and 83 deletions

View File

@ -205,7 +205,8 @@ func startComponents(manifestURL, apiVersion string) (string, string) {
scheduler.New(schedulerConfig).Run()
endpoints := service.NewEndpointController(cl)
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*10)
// ensure the service endpoints are sync'd several times within the window that the integration tests wait
go util.Forever(func() { endpoints.SyncServiceEndpoints() }, time.Second*4)
controllerManager := replicationControllerPkg.NewReplicationManager(cl)
@ -277,7 +278,9 @@ func endpointsSet(c *client.Client, serviceNamespace, serviceID string, endpoint
glog.Infof("Error on creating endpoints: %v", err)
return false, nil
}
glog.Infof("endpoints: %v", endpoints.Endpoints)
for _, e := range endpoints.Endpoints {
glog.Infof("%s/%s endpoint: %s:%d %#v", serviceNamespace, serviceID, e.IP, e.Port, e.TargetRef)
}
return len(endpoints.Endpoints) == endpointCount, nil
}
}
@ -299,6 +302,9 @@ func podNotFound(c *client.Client, podNamespace string, podID string) wait.Condi
func podRunning(c *client.Client, podNamespace string, podID string) wait.ConditionFunc {
return func() (bool, error) {
pod, err := c.Pods(podNamespace).Get(podID)
if apierrors.IsNotFound(err) {
return false, nil
}
if err != nil {
return false, err
}
@ -321,11 +327,15 @@ containers:
ioutil.WriteFile(manifestFile.Name(), []byte(manifest), 0600)
// Wait for the mirror pod to be created.
hostname, _ := os.Hostname()
podName := fmt.Sprintf("static-pod-%s", hostname)
podName := "static-pod-localhost"
namespace := kubelet.NamespaceDefault
if err := wait.Poll(time.Second, time.Second*30,
if err := wait.Poll(time.Second, time.Minute*2,
podRunning(c, namespace, podName)); err != nil {
if pods, err := c.Pods(namespace).List(labels.Everything()); err == nil {
for _, pod := range pods.Items {
glog.Infof("pod found: %s/%s", namespace, pod.Name)
}
}
glog.Fatalf("FAILED: mirror pod has not been created or is not running: %v", err)
}
// Delete the mirror pod, and wait for it to be recreated.
@ -713,7 +723,7 @@ func runServiceTest(client *client.Client) {
glog.Fatalf("Failed to create service: %v, %v", svc3, err)
}
if err := wait.Poll(time.Second, time.Second*20, endpointsSet(client, svc1.Namespace, svc1.Name, 1)); err != nil {
if err := wait.Poll(time.Second, time.Second*30, endpointsSet(client, svc1.Namespace, svc1.Name, 1)); err != nil {
glog.Fatalf("FAILED: unexpected endpoints: %v", err)
}
// A second service with the same port.
@ -732,7 +742,7 @@ func runServiceTest(client *client.Client) {
if err != nil {
glog.Fatalf("Failed to create service: %v, %v", svc2, err)
}
if err := wait.Poll(time.Second, time.Second*20, endpointsSet(client, svc2.Namespace, svc2.Name, 1)); err != nil {
if err := wait.Poll(time.Second, time.Second*30, endpointsSet(client, svc2.Namespace, svc2.Name, 1)); err != nil {
glog.Fatalf("FAILED: unexpected endpoints: %v", err)
}

View File

@ -358,13 +358,13 @@ func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig {
// define file config source
if kc.ConfigFile != "" {
glog.Infof("Adding manifest file: %v", kc.ConfigFile)
config.NewSourceFile(kc.ConfigFile, kc.FileCheckFrequency, cfg.Channel(kubelet.FileSource))
config.NewSourceFile(kc.ConfigFile, kc.Hostname, kc.FileCheckFrequency, cfg.Channel(kubelet.FileSource))
}
// define url config source
if kc.ManifestURL != "" {
glog.Infof("Adding manifest url: %v", kc.ManifestURL)
config.NewSourceURL(kc.ManifestURL, kc.HTTPCheckFrequency, cfg.Channel(kubelet.HTTPSource))
config.NewSourceURL(kc.ManifestURL, kc.Hostname, kc.HTTPCheckFrequency, cfg.Channel(kubelet.HTTPSource))
}
if kc.KubeClient != nil {
glog.Infof("Watching apiserver")

View File

@ -26,8 +26,6 @@ import (
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)
const hostname string = "mcaa1"
type fakePodLW struct {
listResp runtime.Object
watchResp watch.Interface

View File

@ -21,8 +21,6 @@ import (
"crypto/md5"
"encoding/hex"
"fmt"
"os"
"strings"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
@ -35,15 +33,10 @@ import (
"github.com/golang/glog"
)
func applyDefaults(pod *api.Pod, source string, isFile bool) error {
func applyDefaults(pod *api.Pod, source string, isFile bool, hostname string) error {
if len(pod.UID) == 0 {
hasher := md5.New()
if isFile {
hostname, err := os.Hostname() // TODO: kubelet name would be better
if err != nil {
return err
}
hostname = strings.ToLower(hostname)
fmt.Fprintf(hasher, "host:%s", hostname)
fmt.Fprintf(hasher, "file:%s", source)
} else {
@ -60,7 +53,7 @@ func applyDefaults(pod *api.Pod, source string, isFile bool) error {
if len(pod.Name) == 0 {
pod.Name = string(pod.UID)
}
if pod.Name, err = GeneratePodName(pod.Name); err != nil {
if pod.Name, err = GeneratePodName(pod.Name, hostname); err != nil {
return err
}
glog.V(5).Infof("Generated Name %q for UID %q from URL %s", pod.Name, pod.UID, source)
@ -76,7 +69,9 @@ func applyDefaults(pod *api.Pod, source string, isFile bool) error {
return nil
}
func tryDecodeSinglePod(data []byte, source string, isFile bool) (parsed bool, pod api.Pod, err error) {
type defaultFunc func(pod *api.Pod) error
func tryDecodeSinglePod(data []byte, defaultFn defaultFunc) (parsed bool, pod api.Pod, err error) {
obj, err := api.Scheme.Decode(data)
if err != nil {
return false, pod, err
@ -88,7 +83,7 @@ func tryDecodeSinglePod(data []byte, source string, isFile bool) (parsed bool, p
}
newPod := obj.(*api.Pod)
// Apply default values and validate the pod.
if err = applyDefaults(newPod, source, isFile); err != nil {
if err = defaultFn(newPod); err != nil {
return true, pod, err
}
if errs := validation.ValidatePod(newPod); len(errs) > 0 {
@ -98,7 +93,7 @@ func tryDecodeSinglePod(data []byte, source string, isFile bool) (parsed bool, p
return true, *newPod, nil
}
func tryDecodePodList(data []byte, source string, isFile bool) (parsed bool, pods api.PodList, err error) {
func tryDecodePodList(data []byte, defaultFn defaultFunc) (parsed bool, pods api.PodList, err error) {
obj, err := api.Scheme.Decode(data)
if err != nil {
return false, pods, err
@ -112,7 +107,7 @@ func tryDecodePodList(data []byte, source string, isFile bool) (parsed bool, pod
// Apply default values and validate pods.
for i := range newPods.Items {
newPod := &newPods.Items[i]
if err = applyDefaults(newPod, source, isFile); err != nil {
if err = defaultFn(newPod); err != nil {
return true, pods, err
}
if errs := validation.ValidatePod(newPod); len(errs) > 0 {
@ -123,7 +118,7 @@ func tryDecodePodList(data []byte, source string, isFile bool) (parsed bool, pod
return true, *newPods, err
}
func tryDecodeSingleManifest(data []byte, source string, isFile bool) (parsed bool, manifest v1beta1.ContainerManifest, pod api.Pod, err error) {
func tryDecodeSingleManifest(data []byte, defaultFn defaultFunc) (parsed bool, manifest v1beta1.ContainerManifest, pod api.Pod, err error) {
// TODO: should be api.Scheme.Decode
// This is awful. DecodeInto() expects to find an APIObject, which
// Manifest is not. We keep reading manifest for now for compat, but
@ -149,14 +144,14 @@ func tryDecodeSingleManifest(data []byte, source string, isFile bool) (parsed bo
if err = api.Scheme.Convert(&newManifest, &pod); err != nil {
return true, manifest, pod, err
}
if err = applyDefaults(&pod, source, isFile); err != nil {
if err := defaultFn(&pod); err != nil {
return true, manifest, pod, err
}
// Success.
return true, manifest, pod, nil
}
func tryDecodeManifestList(data []byte, source string, isFile bool) (parsed bool, manifests []v1beta1.ContainerManifest, pods api.PodList, err error) {
func tryDecodeManifestList(data []byte, defaultFn defaultFunc) (parsed bool, manifests []v1beta1.ContainerManifest, pods api.PodList, err error) {
// TODO: should be api.Scheme.Decode
// See the comment in tryDecodeSingle().
if err = yaml.Unmarshal(data, &manifests); err != nil {
@ -179,7 +174,7 @@ func tryDecodeManifestList(data []byte, source string, isFile bool) (parsed bool
}
for i := range pods.Items {
pod := &pods.Items[i]
if err = applyDefaults(pod, source, isFile); err != nil {
if err := defaultFn(pod); err != nil {
return true, manifests, pods, err
}
}

View File

@ -18,9 +18,7 @@ package config
import (
"fmt"
"os"
"reflect"
"strings"
"sync"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
@ -363,11 +361,6 @@ func bestPodIdentString(pod *api.Pod) string {
return fmt.Sprintf("%s.%s", name, namespace)
}
func GeneratePodName(name string) (string, error) {
hostname, err := os.Hostname() //TODO: kubelet name would be better
if err != nil {
return "", err
}
hostname = strings.ToLower(hostname)
func GeneratePodName(name, hostname string) (string, error) {
return fmt.Sprintf("%s-%s", name, hostname), nil
}

View File

@ -34,12 +34,14 @@ import (
type sourceFile struct {
path string
hostname string
updates chan<- interface{}
}
func NewSourceFile(path string, period time.Duration, updates chan<- interface{}) {
func NewSourceFile(path string, hostname string, period time.Duration, updates chan<- interface{}) {
config := &sourceFile{
path: path,
hostname: hostname,
updates: updates,
}
glog.V(1).Infof("Watching path %q", path)
@ -52,6 +54,10 @@ func (s *sourceFile) run() {
}
}
func (s *sourceFile) applyDefaults(pod *api.Pod, source string) error {
return applyDefaults(pod, source, true, s.hostname)
}
func (s *sourceFile) extractFromPath() error {
path := s.path
statInfo, err := os.Stat(path)
@ -66,14 +72,14 @@ func (s *sourceFile) extractFromPath() error {
switch {
case statInfo.Mode().IsDir():
pods, err := extractFromDir(path)
pods, err := s.extractFromDir(path)
if err != nil {
return err
}
s.updates <- kubelet.PodUpdate{pods, kubelet.SET, kubelet.FileSource}
case statInfo.Mode().IsRegular():
pod, err := extractFromFile(path)
pod, err := s.extractFromFile(path)
if err != nil {
return err
}
@ -89,7 +95,7 @@ func (s *sourceFile) extractFromPath() error {
// Get as many pod configs as we can from a directory. Return an error iff something
// prevented us from reading anything at all. Do not return an error if only some files
// were problematic.
func extractFromDir(name string) ([]api.Pod, error) {
func (s *sourceFile) extractFromDir(name string) ([]api.Pod, error) {
dirents, err := filepath.Glob(filepath.Join(name, "[^.]*"))
if err != nil {
return nil, fmt.Errorf("glob failed: %v", err)
@ -112,7 +118,7 @@ func extractFromDir(name string) ([]api.Pod, error) {
case statInfo.Mode().IsDir():
glog.V(1).Infof("Not recursing into config path %q", path)
case statInfo.Mode().IsRegular():
pod, err := extractFromFile(path)
pod, err := s.extractFromFile(path)
if err != nil {
glog.V(1).Infof("Can't process config file %q: %v", path, err)
} else {
@ -125,7 +131,7 @@ func extractFromDir(name string) ([]api.Pod, error) {
return pods, nil
}
func extractFromFile(filename string) (pod api.Pod, err error) {
func (s *sourceFile) extractFromFile(filename string) (pod api.Pod, err error) {
glog.V(3).Infof("Reading config file %q", filename)
file, err := os.Open(filename)
if err != nil {
@ -138,7 +144,11 @@ func extractFromFile(filename string) (pod api.Pod, err error) {
return pod, err
}
parsed, _, pod, manifestErr := tryDecodeSingleManifest(data, filename, true)
defaultFn := func(pod *api.Pod) error {
return s.applyDefaults(pod, filename)
}
parsed, _, pod, manifestErr := tryDecodeSingleManifest(data, defaultFn)
if parsed {
if manifestErr != nil {
// It parsed but could not be used.
@ -147,7 +157,7 @@ func extractFromFile(filename string) (pod api.Pod, err error) {
return pod, nil
}
parsed, pod, podErr := tryDecodeSinglePod(data, filename, true)
parsed, pod, podErr := tryDecodeSinglePod(data, defaultFn)
if parsed {
if podErr != nil {
return pod, podErr

View File

@ -21,7 +21,6 @@ import (
"io/ioutil"
"os"
"sort"
"strings"
"testing"
"time"
@ -34,7 +33,7 @@ import (
func TestExtractFromNonExistentFile(t *testing.T) {
ch := make(chan interface{}, 1)
c := sourceFile{"/some/fake/file", ch}
c := sourceFile{"/some/fake/file", "localhost", ch}
err := c.extractFromPath()
if err == nil {
t.Errorf("Expected error")
@ -43,7 +42,7 @@ func TestExtractFromNonExistentFile(t *testing.T) {
func TestUpdateOnNonExistentFile(t *testing.T) {
ch := make(chan interface{})
NewSourceFile("random_non_existent_path", time.Millisecond, ch)
NewSourceFile("random_non_existent_path", "localhost", time.Millisecond, ch)
select {
case got := <-ch:
update := got.(kubelet.PodUpdate)
@ -70,9 +69,7 @@ func writeTestFile(t *testing.T, dir, name string, contents string) *os.File {
}
func TestReadFromFile(t *testing.T) {
hostname, _ := os.Hostname()
hostname = strings.ToLower(hostname)
hostname := "random-test-hostname"
var testCases = []struct {
desc string
fileContents string
@ -256,7 +253,7 @@ func TestReadFromFile(t *testing.T) {
defer os.Remove(file.Name())
ch := make(chan interface{})
NewSourceFile(file.Name(), time.Millisecond, ch)
NewSourceFile(file.Name(), hostname, time.Millisecond, ch)
select {
case got := <-ch:
update := got.(kubelet.PodUpdate)
@ -285,7 +282,7 @@ func TestReadManifestFromFileWithDefaults(t *testing.T) {
defer os.Remove(file.Name())
ch := make(chan interface{})
NewSourceFile(file.Name(), time.Millisecond, ch)
NewSourceFile(file.Name(), "localhost", time.Millisecond, ch)
select {
case got := <-ch:
update := got.(kubelet.PodUpdate)
@ -303,7 +300,7 @@ func TestExtractFromBadDataFile(t *testing.T) {
defer os.Remove(file.Name())
ch := make(chan interface{}, 1)
c := sourceFile{file.Name(), ch}
c := sourceFile{file.Name(), "localhost", ch}
err := c.extractFromPath()
if err == nil {
t.Fatalf("Expected error")
@ -319,7 +316,7 @@ func TestExtractFromEmptyDir(t *testing.T) {
defer os.RemoveAll(dirName)
ch := make(chan interface{}, 1)
c := sourceFile{dirName, ch}
c := sourceFile{dirName, "localhost", ch}
err = c.extractFromPath()
if err != nil {
t.Fatalf("Unexpected error: %v", err)
@ -333,8 +330,7 @@ func TestExtractFromEmptyDir(t *testing.T) {
}
func ExampleManifestAndPod(id string) (v1beta1.ContainerManifest, api.Pod) {
hostname, _ := os.Hostname()
hostname = strings.ToLower(hostname)
hostname := "an-example-host"
manifest := v1beta1.ContainerManifest{
Version: "v1beta1",
@ -417,7 +413,7 @@ func TestExtractFromDir(t *testing.T) {
}
ch := make(chan interface{}, 1)
c := sourceFile{dirName, ch}
c := sourceFile{dirName, "an-example-host", ch}
err = c.extractFromPath()
if err != nil {
t.Fatalf("Unexpected error: %v", err)

View File

@ -33,13 +33,15 @@ import (
type sourceURL struct {
url string
hostname string
updates chan<- interface{}
data []byte
}
func NewSourceURL(url string, period time.Duration, updates chan<- interface{}) {
func NewSourceURL(url, hostname string, period time.Duration, updates chan<- interface{}) {
config := &sourceURL{
url: url,
hostname: hostname,
updates: updates,
data: nil,
}
@ -53,6 +55,10 @@ func (s *sourceURL) run() {
}
}
func (s *sourceURL) applyDefaults(pod *api.Pod) error {
return applyDefaults(pod, s.url, false, s.hostname)
}
func (s *sourceURL) extractFromURL() error {
resp, err := http.Get(s.url)
if err != nil {
@ -78,7 +84,7 @@ func (s *sourceURL) extractFromURL() error {
s.data = data
// First try as if it's a single manifest
parsed, manifest, pod, singleErr := tryDecodeSingleManifest(data, s.url, false)
parsed, manifest, pod, singleErr := tryDecodeSingleManifest(data, s.applyDefaults)
if parsed {
if singleErr != nil {
// It parsed but could not be used.
@ -90,7 +96,7 @@ func (s *sourceURL) extractFromURL() error {
}
// That didn't work, so try an array of manifests.
parsed, manifests, pods, multiErr := tryDecodeManifestList(data, s.url, false)
parsed, manifests, pods, multiErr := tryDecodeManifestList(data, s.applyDefaults)
if parsed {
if multiErr != nil {
// It parsed but could not be used.
@ -112,7 +118,7 @@ func (s *sourceURL) extractFromURL() error {
// Try to parse it as Pod(s).
// First try as it is a single pod.
parsed, pod, singlePodErr := tryDecodeSinglePod(data, s.url, false)
parsed, pod, singlePodErr := tryDecodeSinglePod(data, s.applyDefaults)
if parsed {
if singlePodErr != nil {
// It parsed but could not be used.
@ -123,7 +129,7 @@ func (s *sourceURL) extractFromURL() error {
}
// That didn't work, so try a list of pods.
parsed, pods, multiPodErr := tryDecodePodList(data, s.url, false)
parsed, pods, multiPodErr := tryDecodePodList(data, s.applyDefaults)
if parsed {
if multiPodErr != nil {
// It parsed but could not be used.

View File

@ -19,8 +19,6 @@ package config
import (
"encoding/json"
"net/http/httptest"
"os"
"strings"
"testing"
"time"
@ -35,7 +33,7 @@ import (
func TestURLErrorNotExistNoUpdate(t *testing.T) {
ch := make(chan interface{})
NewSourceURL("http://localhost:49575/_not_found_", time.Millisecond, ch)
NewSourceURL("http://localhost:49575/_not_found_", "localhost", time.Millisecond, ch)
select {
case got := <-ch:
t.Errorf("Expected no update, Got %#v", got)
@ -45,7 +43,7 @@ func TestURLErrorNotExistNoUpdate(t *testing.T) {
func TestExtractFromHttpBadness(t *testing.T) {
ch := make(chan interface{}, 1)
c := sourceURL{"http://localhost:49575/_not_found_", ch, nil}
c := sourceURL{"http://localhost:49575/_not_found_", "other", ch, nil}
if err := c.extractFromURL(); err == nil {
t.Errorf("Expected error")
}
@ -111,7 +109,7 @@ func TestExtractInvalidManifest(t *testing.T) {
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
ch := make(chan interface{}, 1)
c := sourceURL{testServer.URL, ch, nil}
c := sourceURL{testServer.URL, "localhost", ch, nil}
if err := c.extractFromURL(); err == nil {
t.Errorf("%s: Expected error", testCase.desc)
}
@ -119,8 +117,7 @@ func TestExtractInvalidManifest(t *testing.T) {
}
func TestExtractManifestFromHTTP(t *testing.T) {
hostname, _ := os.Hostname()
hostname = strings.ToLower(hostname)
hostname := "random-hostname"
var testCases = []struct {
desc string
@ -263,7 +260,7 @@ func TestExtractManifestFromHTTP(t *testing.T) {
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
ch := make(chan interface{}, 1)
c := sourceURL{testServer.URL, ch, nil}
c := sourceURL{testServer.URL, hostname, ch, nil}
if err := c.extractFromURL(); err != nil {
t.Errorf("%s: Unexpected error: %v", testCase.desc, err)
continue
@ -290,8 +287,7 @@ func TestExtractManifestFromHTTP(t *testing.T) {
}
func TestExtractPodsFromHTTP(t *testing.T) {
hostname, _ := os.Hostname()
hostname = strings.ToLower(hostname)
hostname := "different-value"
var testCases = []struct {
desc string
@ -454,7 +450,7 @@ func TestExtractPodsFromHTTP(t *testing.T) {
testServer := httptest.NewServer(&fakeHandler)
defer testServer.Close()
ch := make(chan interface{}, 1)
c := sourceURL{testServer.URL, ch, nil}
c := sourceURL{testServer.URL, hostname, ch, nil}
if err := c.extractFromURL(); err != nil {
t.Errorf("%s: Unexpected error: %v", testCase.desc, err)
continue