diff --git a/.travis.yml b/.travis.yml index b76be8f8d29..639ae950665 100644 --- a/.travis.yml +++ b/.travis.yml @@ -1,7 +1,6 @@ language: go go: - - 1.2 - 1.3 - tip diff --git a/hack/test-go.sh b/hack/test-go.sh index edb0018d9c7..2755c829637 100755 --- a/hack/test-go.sh +++ b/hack/test-go.sh @@ -39,10 +39,10 @@ find_test_dirs() { cd "${KUBE_TARGET}" if [ "$1" != "" ]; then - go test -cover -coverprofile="tmp.out" "$KUBE_GO_PACKAGE/$1" + go test -race -cover -coverprofile="tmp.out" "$KUBE_GO_PACKAGE/$1" exit 0 fi for package in $(find_test_dirs); do - go test -cover -coverprofile="tmp.out" "${KUBE_GO_PACKAGE}/${package}" + go test -race -cover -coverprofile="tmp.out" "${KUBE_GO_PACKAGE}/${package}" done diff --git a/pkg/kubelet/kubelet.go b/pkg/kubelet/kubelet.go index b5b4d40fa60..ea1392fa8e9 100644 --- a/pkg/kubelet/kubelet.go +++ b/pkg/kubelet/kubelet.go @@ -689,11 +689,13 @@ func (kl *Kubelet) syncManifest(manifest *api.ContainerManifest, keepChannel cha return nil } +type empty struct{} + // Sync the configured list of containers (desired state) with the host current state func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error { glog.Infof("Desired: %+v", config) var err error - dockerIdsToKeep := map[DockerID]bool{} + dockerIdsToKeep := map[DockerID]empty{} keepChannel := make(chan DockerID) waitGroup := sync.WaitGroup{} @@ -711,15 +713,18 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error { } }(ix) } + ch := make(chan bool) go func() { for id := range keepChannel { - dockerIdsToKeep[id] = true + dockerIdsToKeep[id] = empty{} } + ch <- true }() if len(config) > 0 { waitGroup.Wait() - close(keepChannel) } + close(keepChannel) + <-ch // Kill any containers we don't need existingContainers, err := kl.getDockerContainers() @@ -728,7 +733,7 @@ func (kl *Kubelet) SyncManifests(config []api.ContainerManifest) error { return err } for id, container := range existingContainers { - if !dockerIdsToKeep[id] { + if _, ok := dockerIdsToKeep[id]; !ok { glog.Infof("Killing: %s", id) err = kl.killContainer(container) if err != nil { diff --git a/pkg/proxy/config/config.go b/pkg/proxy/config/config.go index aef74a093d8..99db95f9b06 100644 --- a/pkg/proxy/config/config.go +++ b/pkg/proxy/config/config.go @@ -94,8 +94,8 @@ type ServiceConfig struct { endpointsNotifyChannel chan string } -func NewServiceConfig() ServiceConfig { - config := ServiceConfig{ +func NewServiceConfig() *ServiceConfig { + config := &ServiceConfig{ serviceConfigSources: make(map[string]chan ServiceUpdate), endpointsConfigSources: make(map[string]chan EndpointsUpdate), serviceHandlers: make([]ServiceConfigHandler, 10), @@ -130,6 +130,7 @@ func (impl *ServiceConfig) ServiceChannelListener(source string, listenChannel c for { select { case update := <-listenChannel: + impl.configLock.Lock() switch update.Op { case ADD: glog.Infof("Adding new service from source %s : %v", source, update.Services) @@ -152,7 +153,6 @@ func (impl *ServiceConfig) ServiceChannelListener(source string, listenChannel c glog.Infof("Received invalid update type: %v", update) continue } - impl.configLock.Lock() impl.serviceConfig[source] = serviceMap impl.configLock.Unlock() impl.serviceNotifyChannel <- source @@ -165,6 +165,7 @@ func (impl *ServiceConfig) EndpointsChannelListener(source string, listenChannel for { select { case update := <-listenChannel: + impl.configLock.Lock() switch update.Op { case ADD: glog.Infof("Adding a new endpoint %v", update) @@ -188,7 +189,6 @@ func (impl *ServiceConfig) EndpointsChannelListener(source string, listenChannel glog.Infof("Received invalid update type: %v", update) continue } - impl.configLock.Lock() impl.endpointConfig[source] = endpointMap impl.configLock.Unlock() impl.endpointsNotifyChannel <- source diff --git a/pkg/tools/fake_etcd_client.go b/pkg/tools/fake_etcd_client.go index 47509bdff8a..dc08b9c3c5a 100644 --- a/pkg/tools/fake_etcd_client.go +++ b/pkg/tools/fake_etcd_client.go @@ -18,7 +18,6 @@ package tools import ( "fmt" - "sync" "testing" "github.com/coreos/go-etcd/etcd" @@ -30,8 +29,7 @@ type EtcdResponseWithError struct { } type FakeEtcdClient struct { - condWatchCompleted *sync.Cond - condLock sync.Mutex + watchCompletedChan chan bool Data map[string]EtcdResponseWithError DeletedKeys []string @@ -59,12 +57,11 @@ func MakeFakeEtcdClient(t *testing.T) *FakeEtcdClient { // They are only available when Watch() is called. If users of // FakeEtcdClient want to use any of these channels, they have to call // WaitForWatchCompletion before any operation on these channels. - // Internally, FakeEtcdClient use condWatchCompleted to indicate if the + // Internally, FakeEtcdClient use watchCompletedChan to indicate if the // Watch() method has been called. WaitForWatchCompletion() will wait - // on condWatchCompleted. By the end of the Watch() method, it will - // call Broadcast() on condWatchCompleted, which will awake any - // goroutine waiting on this condition. - ret.condWatchCompleted = sync.NewCond(&ret.condLock) + // on this channel. WaitForWatchCompletion() will return only when + // WatchResponse, WatchInjectError and WatchStop are ready to read/write. + ret.watchCompletedChan = make(chan bool) return ret } @@ -116,9 +113,7 @@ func (f *FakeEtcdClient) Delete(key string, recursive bool) (*etcd.Response, err } func (f *FakeEtcdClient) WaitForWatchCompletion() { - f.condLock.Lock() - defer f.condLock.Unlock() - f.condWatchCompleted.Wait() + <-f.watchCompletedChan } func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, receiver chan *etcd.Response, stop chan bool) (*etcd.Response, error) { @@ -129,8 +124,7 @@ func (f *FakeEtcdClient) Watch(prefix string, waitIndex uint64, recursive bool, defer close(injectedError) f.WatchInjectError = injectedError - f.condWatchCompleted.Broadcast() - + f.watchCompletedChan <- true select { case <-stop: return nil, etcd.ErrWatchStoppedByUser