diff --git a/cmd/kubelet/app/server.go b/cmd/kubelet/app/server.go index 16dcf4d28fc..972a9229838 100644 --- a/cmd/kubelet/app/server.go +++ b/cmd/kubelet/app/server.go @@ -69,6 +69,7 @@ type KubeletServer struct { FileCheckFrequency time.Duration HTTPCheckFrequency time.Duration ManifestURL string + ManifestURLHeader string EnableServer bool Address util.IP Port uint @@ -193,6 +194,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) { fs.DurationVar(&s.FileCheckFrequency, "file-check-frequency", s.FileCheckFrequency, "Duration between checking config files for new data") fs.DurationVar(&s.HTTPCheckFrequency, "http-check-frequency", s.HTTPCheckFrequency, "Duration between checking http for new data") fs.StringVar(&s.ManifestURL, "manifest-url", s.ManifestURL, "URL for accessing the container manifest") + fs.StringVar(&s.ManifestURLHeader, "manifest-url-header", s.ManifestURLHeader, "HTTP header to use when accessing the manifest URL, with the key separated from the value with a ':', as in 'key:value'") fs.BoolVar(&s.EnableServer, "enable-server", s.EnableServer, "Enable the Kubelet's server") fs.Var(&s.Address, "address", "The IP address for the Kubelet to serve on (set to 0.0.0.0 for all interfaces)") fs.UintVar(&s.Port, "port", s.Port, "The port for the Kubelet to serve on. Note that \"kubectl logs\" will not work if you set this flag.") // see #9325 @@ -295,6 +297,15 @@ func (s *KubeletServer) Run(_ []string) error { } glog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile) + manifestURLHeader := make(http.Header) + if s.ManifestURLHeader != "" { + pieces := strings.Split(s.ManifestURLHeader, ":") + if len(pieces) != 2 { + return fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", s.ManifestURLHeader) + } + manifestURLHeader.Set(pieces[0], pieces[1]) + } + hostNetworkSources, err := kubelet.GetValidatedSources(strings.Split(s.HostNetworkSources, ",")) if err != nil { return err @@ -330,6 +341,7 @@ func (s *KubeletServer) Run(_ []string) error { RootDirectory: s.RootDirectory, ConfigFile: s.Config, ManifestURL: s.ManifestURL, + ManifestURLHeader: manifestURLHeader, FileCheckFrequency: s.FileCheckFrequency, HTTPCheckFrequency: s.HTTPCheckFrequency, PodInfraContainerImage: s.PodInfraContainerImage, @@ -660,8 +672,8 @@ func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig { // define url config source if kc.ManifestURL != "" { - glog.Infof("Adding manifest url: %v", kc.ManifestURL) - config.NewSourceURL(kc.ManifestURL, kc.NodeName, kc.HTTPCheckFrequency, cfg.Channel(kubelet.HTTPSource)) + glog.Infof("Adding manifest url %q with HTTP header %v", kc.ManifestURL, kc.ManifestURLHeader) + config.NewSourceURL(kc.ManifestURL, kc.ManifestURLHeader, kc.NodeName, kc.HTTPCheckFrequency, cfg.Channel(kubelet.HTTPSource)) } if kc.KubeClient != nil { glog.Infof("Watching apiserver") @@ -683,6 +695,7 @@ type KubeletConfig struct { RootDirectory string ConfigFile string ManifestURL string + ManifestURLHeader http.Header FileCheckFrequency time.Duration HTTPCheckFrequency time.Duration Hostname string diff --git a/pkg/kubelet/config/http.go b/pkg/kubelet/config/http.go index de536ae3b74..9467915b966 100644 --- a/pkg/kubelet/config/http.go +++ b/pkg/kubelet/config/http.go @@ -32,15 +32,18 @@ import ( ) type sourceURL struct { - url string - nodeName string - updates chan<- interface{} - data []byte + url string + header http.Header + nodeName string + updates chan<- interface{} + data []byte + failureLogs int } -func NewSourceURL(url, nodeName string, period time.Duration, updates chan<- interface{}) { +func NewSourceURL(url string, header http.Header, nodeName string, period time.Duration, updates chan<- interface{}) { config := &sourceURL{ url: url, + header: header, nodeName: nodeName, updates: updates, data: nil, @@ -51,7 +54,19 @@ func NewSourceURL(url, nodeName string, period time.Duration, updates chan<- int func (s *sourceURL) run() { if err := s.extractFromURL(); err != nil { - glog.Errorf("Failed to read URL: %v", err) + // Don't log this multiple times per minute. The first few entries should be + // enough to get the point across. + if s.failureLogs < 3 { + glog.Warningf("Failed to read pods from URL: %v", err) + } else if s.failureLogs == 3 { + glog.Warningf("Failed to read pods from URL. Won't log this message anymore: %v", err) + } + s.failureLogs++ + } else { + if s.failureLogs > 0 { + glog.Info("Successfully read pods from URL.") + s.failureLogs = 0 + } } } @@ -60,7 +75,13 @@ func (s *sourceURL) applyDefaults(pod *api.Pod) error { } func (s *sourceURL) extractFromURL() error { - resp, err := http.Get(s.url) + req, err := http.NewRequest("GET", s.url, nil) + if err != nil { + return err + } + req.Header = s.header + client := &http.Client{} + resp, err := client.Do(req) if err != nil { return err } diff --git a/pkg/kubelet/config/http_test.go b/pkg/kubelet/config/http_test.go index ab3487993bf..69e3faaa146 100644 --- a/pkg/kubelet/config/http_test.go +++ b/pkg/kubelet/config/http_test.go @@ -18,6 +18,7 @@ package config import ( "encoding/json" + "net/http" "net/http/httptest" "testing" "time" @@ -33,7 +34,7 @@ import ( func TestURLErrorNotExistNoUpdate(t *testing.T) { ch := make(chan interface{}) - NewSourceURL("http://localhost:49575/_not_found_", "localhost", time.Millisecond, ch) + NewSourceURL("http://localhost:49575/_not_found_", http.Header{}, "localhost", time.Millisecond, ch) select { case got := <-ch: t.Errorf("Expected no update, Got %#v", got) @@ -43,7 +44,7 @@ func TestURLErrorNotExistNoUpdate(t *testing.T) { func TestExtractFromHttpBadness(t *testing.T) { ch := make(chan interface{}, 1) - c := sourceURL{"http://localhost:49575/_not_found_", "other", ch, nil} + c := sourceURL{"http://localhost:49575/_not_found_", http.Header{}, "other", ch, nil, 0} if err := c.extractFromURL(); err == nil { t.Errorf("Expected error") } @@ -112,7 +113,7 @@ func TestExtractInvalidPods(t *testing.T) { testServer := httptest.NewServer(&fakeHandler) defer testServer.Close() ch := make(chan interface{}, 1) - c := sourceURL{testServer.URL, "localhost", ch, nil} + c := sourceURL{testServer.URL, http.Header{}, "localhost", ch, nil, 0} if err := c.extractFromURL(); err == nil { t.Errorf("%s: Expected error", testCase.desc) } @@ -259,7 +260,7 @@ func TestExtractPodsFromHTTP(t *testing.T) { testServer := httptest.NewServer(&fakeHandler) defer testServer.Close() ch := make(chan interface{}, 1) - c := sourceURL{testServer.URL, hostname, ch, nil} + c := sourceURL{testServer.URL, http.Header{}, hostname, ch, nil, 0} if err := c.extractFromURL(); err != nil { t.Errorf("%s: Unexpected error: %v", testCase.desc, err) continue @@ -276,3 +277,47 @@ func TestExtractPodsFromHTTP(t *testing.T) { } } } + +func TestURLWithHeader(t *testing.T) { + pod := &api.Pod{ + TypeMeta: api.TypeMeta{ + APIVersion: testapi.Version(), + Kind: "Pod", + }, + ObjectMeta: api.ObjectMeta{ + Name: "foo", + UID: "111", + Namespace: "mynamespace", + }, + Spec: api.PodSpec{ + NodeName: "localhost", + Containers: []api.Container{{Name: "1", Image: "foo", ImagePullPolicy: api.PullAlways}}, + }, + } + data, err := json.Marshal(pod) + if err != nil { + t.Fatalf("Unexpected json marshalling error: %v", err) + } + fakeHandler := util.FakeHandler{ + StatusCode: 200, + ResponseBody: string(data), + } + testServer := httptest.NewServer(&fakeHandler) + defer testServer.Close() + ch := make(chan interface{}, 1) + header := make(http.Header) + header.Set("Metadata-Flavor", "Google") + c := sourceURL{testServer.URL, header, "localhost", ch, nil, 0} + if err := c.extractFromURL(); err != nil { + t.Fatalf("Unexpected error extracting from URL: %v", err) + } + update := (<-ch).(kubelet.PodUpdate) + + headerVal := fakeHandler.RequestReceived.Header["Metadata-Flavor"] + if len(headerVal) != 1 || headerVal[0] != "Google" { + t.Errorf("Header missing expected entry %v. Got %v", header, fakeHandler.RequestReceived.Header) + } + if len(update.Pods) != 1 { + t.Errorf("Received wrong number of pods, expected one: %v", update.Pods) + } +}