Merge pull request #262 from jjhuff/fix_etcd_watches

Watch and Get on the same Etcd key.
This commit is contained in:
Daniel Smith 2014-06-26 19:04:48 -07:00
commit e472d60578
2 changed files with 8 additions and 6 deletions

View File

@ -25,6 +25,7 @@ import (
"net/http" "net/http"
"os" "os"
"os/exec" "os/exec"
"path"
"path/filepath" "path/filepath"
"sort" "sort"
"strconv" "strconv"
@ -539,7 +540,7 @@ func (kl *Kubelet) ResponseToManifests(response *etcd.Response) ([]api.Container
} }
func (kl *Kubelet) getKubeletStateFromEtcd(key string, updateChannel chan<- manifestUpdate) error { func (kl *Kubelet) getKubeletStateFromEtcd(key string, updateChannel chan<- manifestUpdate) error {
response, err := kl.EtcdClient.Get(key+"/kubelet", true, false) response, err := kl.EtcdClient.Get(key, true, false)
if err != nil { if err != nil {
if util.IsEtcdNotFound(err) { if util.IsEtcdNotFound(err) {
return nil return nil
@ -561,7 +562,8 @@ func (kl *Kubelet) getKubeletStateFromEtcd(key string, updateChannel chan<- mani
// The channel to send new configurations across // The channel to send new configurations across
// This function loops forever and is intended to be run in a go routine. // This function loops forever and is intended to be run in a go routine.
func (kl *Kubelet) SyncAndSetupEtcdWatch(updateChannel chan<- manifestUpdate) { func (kl *Kubelet) SyncAndSetupEtcdWatch(updateChannel chan<- manifestUpdate) {
key := "/registry/hosts/" + strings.TrimSpace(kl.Hostname) key := path.Join("registry", "hosts", strings.TrimSpace(kl.Hostname), "kubelet")
// First fetch the initial configuration (watch only gives changes...) // First fetch the initial configuration (watch only gives changes...)
for { for {
err := kl.getKubeletStateFromEtcd(key, updateChannel) err := kl.getKubeletStateFromEtcd(key, updateChannel)

View File

@ -378,7 +378,7 @@ func TestGetKubeletStateFromEtcdNoData(t *testing.T) {
R: &etcd.Response{}, R: &etcd.Response{},
E: nil, E: nil,
} }
err := kubelet.getKubeletStateFromEtcd("/registry/hosts/machine", channel) err := kubelet.getKubeletStateFromEtcd("/registry/hosts/machine/kubelet", channel)
if err == nil { if err == nil {
t.Error("Unexpected no err.") t.Error("Unexpected no err.")
} }
@ -404,7 +404,7 @@ func TestGetKubeletStateFromEtcd(t *testing.T) {
}, },
E: nil, E: nil,
} }
err := kubelet.getKubeletStateFromEtcd("/registry/hosts/machine", channel) err := kubelet.getKubeletStateFromEtcd("/registry/hosts/machine/kubelet", channel)
expectNoError(t, err) expectNoError(t, err)
close(channel) close(channel)
list := reader.GetList() list := reader.GetList()
@ -426,7 +426,7 @@ func TestGetKubeletStateFromEtcdNotFound(t *testing.T) {
ErrorCode: 100, ErrorCode: 100,
}, },
} }
err := kubelet.getKubeletStateFromEtcd("/registry/hosts/machine", channel) err := kubelet.getKubeletStateFromEtcd("/registry/hosts/machine/kubelet", channel)
expectNoError(t, err) expectNoError(t, err)
close(channel) close(channel)
list := reader.GetList() list := reader.GetList()
@ -448,7 +448,7 @@ func TestGetKubeletStateFromEtcdError(t *testing.T) {
ErrorCode: 200, // non not found error ErrorCode: 200, // non not found error
}, },
} }
err := kubelet.getKubeletStateFromEtcd("/registry/hosts/machine", channel) err := kubelet.getKubeletStateFromEtcd("/registry/hosts/machine/kubelet", channel)
if err == nil { if err == nil {
t.Error("Unexpected non-error") t.Error("Unexpected non-error")
} }