mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-23 11:50:44 +00:00
Merge pull request #12024 from a-robinson/metadata
Support passing a header to the manifest URL in the kubelet.
This commit is contained in:
commit
9582c7e54e
@ -69,6 +69,7 @@ type KubeletServer struct {
|
||||
FileCheckFrequency time.Duration
|
||||
HTTPCheckFrequency time.Duration
|
||||
ManifestURL string
|
||||
ManifestURLHeader string
|
||||
EnableServer bool
|
||||
Address util.IP
|
||||
Port uint
|
||||
@ -193,6 +194,7 @@ func (s *KubeletServer) AddFlags(fs *pflag.FlagSet) {
|
||||
fs.DurationVar(&s.FileCheckFrequency, "file-check-frequency", s.FileCheckFrequency, "Duration between checking config files for new data")
|
||||
fs.DurationVar(&s.HTTPCheckFrequency, "http-check-frequency", s.HTTPCheckFrequency, "Duration between checking http for new data")
|
||||
fs.StringVar(&s.ManifestURL, "manifest-url", s.ManifestURL, "URL for accessing the container manifest")
|
||||
fs.StringVar(&s.ManifestURLHeader, "manifest-url-header", s.ManifestURLHeader, "HTTP header to use when accessing the manifest URL, with the key separated from the value with a ':', as in 'key:value'")
|
||||
fs.BoolVar(&s.EnableServer, "enable-server", s.EnableServer, "Enable the Kubelet's server")
|
||||
fs.Var(&s.Address, "address", "The IP address for the Kubelet to serve on (set to 0.0.0.0 for all interfaces)")
|
||||
fs.UintVar(&s.Port, "port", s.Port, "The port for the Kubelet to serve on. Note that \"kubectl logs\" will not work if you set this flag.") // see #9325
|
||||
@ -295,6 +297,15 @@ func (s *KubeletServer) Run(_ []string) error {
|
||||
}
|
||||
glog.V(2).Infof("Successfully initialized cloud provider: %q from the config file: %q\n", s.CloudProvider, s.CloudConfigFile)
|
||||
|
||||
manifestURLHeader := make(http.Header)
|
||||
if s.ManifestURLHeader != "" {
|
||||
pieces := strings.Split(s.ManifestURLHeader, ":")
|
||||
if len(pieces) != 2 {
|
||||
return fmt.Errorf("manifest-url-header must have a single ':' key-value separator, got %q", s.ManifestURLHeader)
|
||||
}
|
||||
manifestURLHeader.Set(pieces[0], pieces[1])
|
||||
}
|
||||
|
||||
hostNetworkSources, err := kubelet.GetValidatedSources(strings.Split(s.HostNetworkSources, ","))
|
||||
if err != nil {
|
||||
return err
|
||||
@ -330,6 +341,7 @@ func (s *KubeletServer) Run(_ []string) error {
|
||||
RootDirectory: s.RootDirectory,
|
||||
ConfigFile: s.Config,
|
||||
ManifestURL: s.ManifestURL,
|
||||
ManifestURLHeader: manifestURLHeader,
|
||||
FileCheckFrequency: s.FileCheckFrequency,
|
||||
HTTPCheckFrequency: s.HTTPCheckFrequency,
|
||||
PodInfraContainerImage: s.PodInfraContainerImage,
|
||||
@ -660,8 +672,8 @@ func makePodSourceConfig(kc *KubeletConfig) *config.PodConfig {
|
||||
|
||||
// define url config source
|
||||
if kc.ManifestURL != "" {
|
||||
glog.Infof("Adding manifest url: %v", kc.ManifestURL)
|
||||
config.NewSourceURL(kc.ManifestURL, kc.NodeName, kc.HTTPCheckFrequency, cfg.Channel(kubelet.HTTPSource))
|
||||
glog.Infof("Adding manifest url %q with HTTP header %v", kc.ManifestURL, kc.ManifestURLHeader)
|
||||
config.NewSourceURL(kc.ManifestURL, kc.ManifestURLHeader, kc.NodeName, kc.HTTPCheckFrequency, cfg.Channel(kubelet.HTTPSource))
|
||||
}
|
||||
if kc.KubeClient != nil {
|
||||
glog.Infof("Watching apiserver")
|
||||
@ -683,6 +695,7 @@ type KubeletConfig struct {
|
||||
RootDirectory string
|
||||
ConfigFile string
|
||||
ManifestURL string
|
||||
ManifestURLHeader http.Header
|
||||
FileCheckFrequency time.Duration
|
||||
HTTPCheckFrequency time.Duration
|
||||
Hostname string
|
||||
|
@ -32,15 +32,18 @@ import (
|
||||
)
|
||||
|
||||
type sourceURL struct {
|
||||
url string
|
||||
nodeName string
|
||||
updates chan<- interface{}
|
||||
data []byte
|
||||
url string
|
||||
header http.Header
|
||||
nodeName string
|
||||
updates chan<- interface{}
|
||||
data []byte
|
||||
failureLogs int
|
||||
}
|
||||
|
||||
func NewSourceURL(url, nodeName string, period time.Duration, updates chan<- interface{}) {
|
||||
func NewSourceURL(url string, header http.Header, nodeName string, period time.Duration, updates chan<- interface{}) {
|
||||
config := &sourceURL{
|
||||
url: url,
|
||||
header: header,
|
||||
nodeName: nodeName,
|
||||
updates: updates,
|
||||
data: nil,
|
||||
@ -51,7 +54,19 @@ func NewSourceURL(url, nodeName string, period time.Duration, updates chan<- int
|
||||
|
||||
func (s *sourceURL) run() {
|
||||
if err := s.extractFromURL(); err != nil {
|
||||
glog.Errorf("Failed to read URL: %v", err)
|
||||
// Don't log this multiple times per minute. The first few entries should be
|
||||
// enough to get the point across.
|
||||
if s.failureLogs < 3 {
|
||||
glog.Warningf("Failed to read pods from URL: %v", err)
|
||||
} else if s.failureLogs == 3 {
|
||||
glog.Warningf("Failed to read pods from URL. Won't log this message anymore: %v", err)
|
||||
}
|
||||
s.failureLogs++
|
||||
} else {
|
||||
if s.failureLogs > 0 {
|
||||
glog.Info("Successfully read pods from URL.")
|
||||
s.failureLogs = 0
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -60,7 +75,13 @@ func (s *sourceURL) applyDefaults(pod *api.Pod) error {
|
||||
}
|
||||
|
||||
func (s *sourceURL) extractFromURL() error {
|
||||
resp, err := http.Get(s.url)
|
||||
req, err := http.NewRequest("GET", s.url, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
req.Header = s.header
|
||||
client := &http.Client{}
|
||||
resp, err := client.Do(req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -18,6 +18,7 @@ package config
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"net/http"
|
||||
"net/http/httptest"
|
||||
"testing"
|
||||
"time"
|
||||
@ -33,7 +34,7 @@ import (
|
||||
|
||||
func TestURLErrorNotExistNoUpdate(t *testing.T) {
|
||||
ch := make(chan interface{})
|
||||
NewSourceURL("http://localhost:49575/_not_found_", "localhost", time.Millisecond, ch)
|
||||
NewSourceURL("http://localhost:49575/_not_found_", http.Header{}, "localhost", time.Millisecond, ch)
|
||||
select {
|
||||
case got := <-ch:
|
||||
t.Errorf("Expected no update, Got %#v", got)
|
||||
@ -43,7 +44,7 @@ func TestURLErrorNotExistNoUpdate(t *testing.T) {
|
||||
|
||||
func TestExtractFromHttpBadness(t *testing.T) {
|
||||
ch := make(chan interface{}, 1)
|
||||
c := sourceURL{"http://localhost:49575/_not_found_", "other", ch, nil}
|
||||
c := sourceURL{"http://localhost:49575/_not_found_", http.Header{}, "other", ch, nil, 0}
|
||||
if err := c.extractFromURL(); err == nil {
|
||||
t.Errorf("Expected error")
|
||||
}
|
||||
@ -112,7 +113,7 @@ func TestExtractInvalidPods(t *testing.T) {
|
||||
testServer := httptest.NewServer(&fakeHandler)
|
||||
defer testServer.Close()
|
||||
ch := make(chan interface{}, 1)
|
||||
c := sourceURL{testServer.URL, "localhost", ch, nil}
|
||||
c := sourceURL{testServer.URL, http.Header{}, "localhost", ch, nil, 0}
|
||||
if err := c.extractFromURL(); err == nil {
|
||||
t.Errorf("%s: Expected error", testCase.desc)
|
||||
}
|
||||
@ -259,7 +260,7 @@ func TestExtractPodsFromHTTP(t *testing.T) {
|
||||
testServer := httptest.NewServer(&fakeHandler)
|
||||
defer testServer.Close()
|
||||
ch := make(chan interface{}, 1)
|
||||
c := sourceURL{testServer.URL, hostname, ch, nil}
|
||||
c := sourceURL{testServer.URL, http.Header{}, hostname, ch, nil, 0}
|
||||
if err := c.extractFromURL(); err != nil {
|
||||
t.Errorf("%s: Unexpected error: %v", testCase.desc, err)
|
||||
continue
|
||||
@ -276,3 +277,47 @@ func TestExtractPodsFromHTTP(t *testing.T) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestURLWithHeader(t *testing.T) {
|
||||
pod := &api.Pod{
|
||||
TypeMeta: api.TypeMeta{
|
||||
APIVersion: testapi.Version(),
|
||||
Kind: "Pod",
|
||||
},
|
||||
ObjectMeta: api.ObjectMeta{
|
||||
Name: "foo",
|
||||
UID: "111",
|
||||
Namespace: "mynamespace",
|
||||
},
|
||||
Spec: api.PodSpec{
|
||||
NodeName: "localhost",
|
||||
Containers: []api.Container{{Name: "1", Image: "foo", ImagePullPolicy: api.PullAlways}},
|
||||
},
|
||||
}
|
||||
data, err := json.Marshal(pod)
|
||||
if err != nil {
|
||||
t.Fatalf("Unexpected json marshalling error: %v", err)
|
||||
}
|
||||
fakeHandler := util.FakeHandler{
|
||||
StatusCode: 200,
|
||||
ResponseBody: string(data),
|
||||
}
|
||||
testServer := httptest.NewServer(&fakeHandler)
|
||||
defer testServer.Close()
|
||||
ch := make(chan interface{}, 1)
|
||||
header := make(http.Header)
|
||||
header.Set("Metadata-Flavor", "Google")
|
||||
c := sourceURL{testServer.URL, header, "localhost", ch, nil, 0}
|
||||
if err := c.extractFromURL(); err != nil {
|
||||
t.Fatalf("Unexpected error extracting from URL: %v", err)
|
||||
}
|
||||
update := (<-ch).(kubelet.PodUpdate)
|
||||
|
||||
headerVal := fakeHandler.RequestReceived.Header["Metadata-Flavor"]
|
||||
if len(headerVal) != 1 || headerVal[0] != "Google" {
|
||||
t.Errorf("Header missing expected entry %v. Got %v", header, fakeHandler.RequestReceived.Header)
|
||||
}
|
||||
if len(update.Pods) != 1 {
|
||||
t.Errorf("Received wrong number of pods, expected one: %v", update.Pods)
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user