mirror of
https://github.com/k3s-io/kubernetes.git
synced 2025-07-27 21:47:07 +00:00
manifest_url needs to take a single ContainerManifest.
This commit is contained in:
parent
11963ecf35
commit
77af24e7dc
@ -94,10 +94,14 @@ const (
|
|||||||
func (kl *Kubelet) RunKubelet(file, manifest_url, etcd_servers, address string, port uint) {
|
func (kl *Kubelet) RunKubelet(file, manifest_url, etcd_servers, address string, port uint) {
|
||||||
updateChannel := make(chan manifestUpdate)
|
updateChannel := make(chan manifestUpdate)
|
||||||
if file != "" {
|
if file != "" {
|
||||||
go util.Forever(func() { kl.WatchFile(file, updateChannel) }, 20*time.Second)
|
go util.Forever(func() { kl.WatchFile(file, updateChannel) }, kl.FileCheckFrequency)
|
||||||
}
|
}
|
||||||
if manifest_url != "" {
|
if manifest_url != "" {
|
||||||
go util.Forever(func() { kl.WatchHTTP(manifest_url, updateChannel) }, 20*time.Second)
|
go util.Forever(func() {
|
||||||
|
if err := kl.extractFromHTTP(manifest_url, updateChannel); err != nil {
|
||||||
|
log.Printf("Error syncing http: %#v", err)
|
||||||
|
}
|
||||||
|
}, kl.HTTPCheckFrequency)
|
||||||
}
|
}
|
||||||
if etcd_servers != "" {
|
if etcd_servers != "" {
|
||||||
servers := []string{etcd_servers}
|
servers := []string{etcd_servers}
|
||||||
@ -434,69 +438,64 @@ func (kl *Kubelet) extractMultipleFromReader(reader io.Reader) ([]api.ContainerM
|
|||||||
return manifests, nil
|
return manifests, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (kl *Kubelet) extractSingleFromReader(reader io.Reader) (api.ContainerManifest, error) {
|
||||||
|
var manifest api.ContainerManifest
|
||||||
|
data, err := ioutil.ReadAll(reader)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Couldn't read from reader: %v", err)
|
||||||
|
return manifest, err
|
||||||
|
}
|
||||||
|
if err = kl.ExtractYAMLData(data, &manifest); err != nil {
|
||||||
|
return manifest, err
|
||||||
|
}
|
||||||
|
return manifest, nil
|
||||||
|
}
|
||||||
|
|
||||||
// Watch a file for changes to the set of pods that should run on this Kubelet
|
// Watch a file for changes to the set of pods that should run on this Kubelet
|
||||||
// This function loops forever and is intended to be run as a goroutine
|
// This function loops forever and is intended to be run as a goroutine
|
||||||
func (kl *Kubelet) WatchFile(file string, updateChannel chan<- manifestUpdate) {
|
func (kl *Kubelet) WatchFile(file string, updateChannel chan<- manifestUpdate) {
|
||||||
for {
|
var err error
|
||||||
var err error
|
|
||||||
|
|
||||||
time.Sleep(kl.FileCheckFrequency)
|
fileInfo, err := os.Stat(file)
|
||||||
|
if err != nil {
|
||||||
fileInfo, err := os.Stat(file)
|
log.Printf("Error polling file: %#v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if fileInfo.IsDir() {
|
||||||
|
manifests, err := kl.extractFromDir(file)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("Error polling dir: %#v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
updateChannel <- manifestUpdate{fileSource, manifests}
|
||||||
|
} else {
|
||||||
|
manifest, err := kl.extractFromFile(file)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("Error polling file: %#v", err)
|
log.Printf("Error polling file: %#v", err)
|
||||||
continue
|
return
|
||||||
}
|
|
||||||
if fileInfo.IsDir() {
|
|
||||||
manifests, err := kl.extractFromDir(file)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Error polling dir: %#v", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
updateChannel <- manifestUpdate{fileSource, manifests}
|
|
||||||
} else {
|
|
||||||
manifest, err := kl.extractFromFile(file)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Error polling file: %#v", err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
updateChannel <- manifestUpdate{fileSource, []api.ContainerManifest{manifest}}
|
|
||||||
}
|
}
|
||||||
|
updateChannel <- manifestUpdate{fileSource, []api.ContainerManifest{manifest}}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (kl *Kubelet) extractFromHTTP(url string, updateChannel chan<- manifestUpdate) error {
|
func (kl *Kubelet) extractFromHTTP(url string, updateChannel chan<- manifestUpdate) error {
|
||||||
client := &http.Client{}
|
|
||||||
request, err := http.NewRequest("GET", url, nil)
|
request, err := http.NewRequest("GET", url, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
response, err := client.Do(request)
|
response, err := http.DefaultClient.Do(request)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
defer response.Body.Close()
|
defer response.Body.Close()
|
||||||
manifests, err := kl.extractMultipleFromReader(response.Body)
|
manifest, err := kl.extractSingleFromReader(response.Body)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
updateChannel <- manifestUpdate{httpClientSource, manifests}
|
updateChannel <- manifestUpdate{httpClientSource, []api.ContainerManifest{manifest}}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// Watch an HTTP endpoint for changes to the set of pods that should run on this Kubelet
|
|
||||||
// This function runs forever and is intended to be run as a goroutine
|
|
||||||
func (kl *Kubelet) WatchHTTP(url string, updateChannel chan<- manifestUpdate) {
|
|
||||||
for {
|
|
||||||
var err error
|
|
||||||
time.Sleep(kl.HTTPCheckFrequency)
|
|
||||||
err = kl.extractFromHTTP(url, updateChannel)
|
|
||||||
if err != nil {
|
|
||||||
log.Printf("Error syncing http: %#v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Take an etcd Response object, and turn it into a structured list of containers
|
// Take an etcd Response object, and turn it into a structured list of containers
|
||||||
// Return a list of containers, or an error if one occurs.
|
// Return a list of containers, or an error if one occurs.
|
||||||
func (kl *Kubelet) ResponseToManifests(response *etcd.Response) ([]api.ContainerManifest, error) {
|
func (kl *Kubelet) ResponseToManifests(response *etcd.Response) ([]api.ContainerManifest, error) {
|
||||||
|
@ -835,7 +835,9 @@ func TestExtractFromHttp(t *testing.T) {
|
|||||||
manifests := []api.ContainerManifest{
|
manifests := []api.ContainerManifest{
|
||||||
{Id: "foo"},
|
{Id: "foo"},
|
||||||
}
|
}
|
||||||
data, err := json.Marshal(manifests)
|
// TODO: provide a mechanism for taking arrays of
|
||||||
|
// manifests or a single manifest.
|
||||||
|
data, err := json.Marshal(manifests[0])
|
||||||
|
|
||||||
fakeHandler := util.FakeHandler{
|
fakeHandler := util.FakeHandler{
|
||||||
StatusCode: 200,
|
StatusCode: 200,
|
||||||
|
Loading…
Reference in New Issue
Block a user